From: Kai Moritz Date: Thu, 12 Jan 2023 22:40:12 +0000 (+0100) Subject: feat: Prepared the application for sharding X-Git-Tag: wip-sharding~33 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=9d61871563f4be4850cebca4fad0545d504522c3;p=demos%2Fkafka%2Fchat feat: Prepared the application for sharding - The `ChatBackendController` stores the `ChatHome`s in an array. - Reintroduced a `ChatRoomFactory` --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index 10e9d37b..c196b7ee 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -16,7 +16,7 @@ public class ChatBackendConfiguration @Bean public ChatHome chatHome(ChatHomeService chatHomeService) { - return new ChatHome(chatHomeService); + return new ChatHome(chatHomeService, 0); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index d0d27634..1c0224bc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.api; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.RequiredArgsConstructor; import org.springframework.http.codec.ServerSentEvent; @@ -16,26 +17,36 @@ import java.util.UUID; @RequiredArgsConstructor public class ChatBackendController { - private final ChatHome chatHome; + private final ChatHome[] chatHomes; + private final ShardingStrategy selectionStrategy; + private final ChatRoomFactory factory; private final StorageStrategy storageStrategy; @PostMapping("create") public Mono create(@RequestBody String name) { - return chatHome.createChatroom(name).map(ChatRoomTo::from); + UUID chatRoomId = UUID.randomUUID(); + return factory + .createChatRoom(chatRoomId, name) + .flatMap(chatRoom -> chatHomes[chatRoom.getShard()].putChatRoom(chatRoom)) + .map(ChatRoomTo::from); } @GetMapping("list") public Flux list() { - return chatHome.getChatRooms().map(chatroom -> ChatRoomTo.from(chatroom)); + return Flux + .fromArray(chatHomes) + .flatMap(chatHome -> chatHome.getChatRooms()) + .map(chatroom -> ChatRoomTo.from(chatroom)); } @GetMapping("{chatroomId}/list") public Flux list(@PathVariable UUID chatroomId) { - return chatHome + int shard = selectionStrategy.selectShard(chatroomId); + return chatHomes[shard] .getChatRoom(chatroomId) .flatMapMany(chatroom -> chatroom .getMessages() @@ -45,7 +56,10 @@ public class ChatBackendController @GetMapping("{chatroomId}") public Mono get(@PathVariable UUID chatroomId) { - return chatHome.getChatRoom(chatroomId).map(chatroom -> ChatRoomTo.from(chatroom)); + int shard = selectionStrategy.selectShard(chatroomId); + return chatHomes[shard] + .getChatRoom(chatroomId) + .map(chatroom -> ChatRoomTo.from(chatroom)); } @PutMapping("{chatroomId}/{username}/{messageId}") @@ -55,8 +69,9 @@ public class ChatBackendController @PathVariable Long messageId, @RequestBody String text) { + int shard = selectionStrategy.selectShard(chatroomId); return - chatHome + chatHomes[shard] .getChatRoom(chatroomId) .flatMap(chatroom -> put(chatroom, username, messageId, text)); } @@ -82,8 +97,9 @@ public class ChatBackendController @PathVariable String username, @PathVariable Long messageId) { + int shard = selectionStrategy.selectShard(chatroomId); return - chatHome + chatHomes[shard] .getChatRoom(chatroomId) .flatMap(chatroom -> get(chatroom, username, messageId)); } @@ -102,7 +118,8 @@ public class ChatBackendController @GetMapping(path = "{chatroomId}/listen") public Flux> listen(@PathVariable UUID chatroomId) { - return chatHome + int shard = selectionStrategy.selectShard(chatroomId); + return chatHomes[shard] .getChatRoom(chatroomId) .flatMapMany(chatroom -> listen(chatroom)); } @@ -124,6 +141,7 @@ public class ChatBackendController @PostMapping("/store") public void store() { - storageStrategy.write(chatHome.getChatRooms()); + for (int shard = 0; shard < chatHomes.length; shard++) + storageStrategy.write(chatHomes[shard].getChatRooms()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java index 859f797f..e997e4bb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java @@ -10,13 +10,15 @@ public class ChatRoomTo { private UUID id; private String name; + private int shard; public static ChatRoomTo from(ChatRoom chatroom) { - ChatRoomTo info = new ChatRoomTo(); - info.id = chatroom.getId(); - info.name = chatroom.getName(); - return info; + ChatRoomTo to = new ChatRoomTo(); + to.id = chatroom.getId(); + to.name = chatroom.getName(); + to.shard = chatroom.getShard(); + return to; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java new file mode 100644 index 00000000..36f7e230 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.api; + +import java.util.UUID; + + +public interface ShardingStrategy +{ + int selectShard(UUID chatRoomId); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 1450d3e3..2fc0e356 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -13,21 +13,22 @@ import java.util.*; public class ChatHome { private final ChatHomeService service; + private final int shard; - public Mono createChatroom(String name) + public Mono putChatRoom(ChatRoom chatRoom) { - return service.createChatRoom(UUID.randomUUID(), name); + return service.putChatRoom(chatRoom); } public Mono getChatRoom(UUID id) { return service - .getChatRoom(id) + .getChatRoom(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } public Flux getChatRooms() { - return service.getChatRooms(); + return service.getChatRooms(shard); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 5743655b..d2bc508c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -8,7 +8,7 @@ import java.util.UUID; public interface ChatHomeService { - Mono createChatRoom(UUID id, String name); - Mono getChatRoom(UUID id); - Flux getChatRooms(); + Mono putChatRoom(ChatRoom chatRoom); + Mono getChatRoom(int shard, UUID id); + Flux getChatRooms(int shard); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 30197432..44965852 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -25,23 +25,31 @@ public class ChatRoom private final UUID id; @Getter private final String name; + @Getter + private final int shard; private final Clock clock; private final ChatRoomService service; private final int bufferSize; private Sinks.Many sink; + public ChatRoom( UUID id, String name, + int shard, Clock clock, ChatRoomService service, int bufferSize) { this.id = id; this.name = name; + this.shard = shard; this.clock = clock; this.service = service; this.bufferSize = bufferSize; + // @RequiredArgsConstructor unfortunately not possible, because + // the `bufferSize` is not set, if `createSink()` is called + // from the variable declaration! this.sink = createSink(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java new file mode 100644 index 00000000..324e4b02 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.chat.backend.domain; + +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +public interface ChatRoomFactory +{ + Mono createChatRoom(UUID id, String name); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 96515bfc..acfd936b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -6,9 +6,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.Clock; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -16,42 +14,36 @@ import java.util.UUID; @Slf4j public class InMemoryChatHomeService implements ChatHomeService { - private final Map chatrooms; - private final Clock clock; - private final int bufferSize; + private final Map[] chatrooms; - public InMemoryChatHomeService( - Flux chatroomFlux, - Clock clock, - int bufferSize) + public InMemoryChatHomeService(int numShards, Flux chatroomFlux) { - log.debug("Creating InMemoryChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize); - this.chatrooms = new HashMap<>(); - chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); - this.clock = clock; - this.bufferSize = bufferSize; + log.debug("Creating InMemoryChatHomeService"); + this.chatrooms = new Map[numShards]; + for (int shard = 0; shard < numShards; shard++) + chatrooms[shard] = new HashMap<>(); + chatroomFlux + .toStream() + .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } @Override - public Mono createChatRoom(UUID id, String name) + public Mono putChatRoom(ChatRoom chatRoom) { - InMemoryChatRoomService service = - new InMemoryChatRoomService(new LinkedHashMap<>()); - ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize); - chatrooms.put(chatRoom.getId(), chatRoom); + chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); return Mono.just(chatRoom); } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoom(int shard, UUID id) { - return Mono.justOrEmpty(chatrooms.get(id)); + return Mono.justOrEmpty(chatrooms[shard].get(id)); } @Override - public Flux getChatRooms() + public Flux getChatRooms(int shard) { - return Flux.fromStream(chatrooms.values().stream()); + return Flux.fromStream(chatrooms[shard].values().stream()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java new file mode 100644 index 00000000..50fa7057 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -0,0 +1,33 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.api.ShardingStrategy; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Clock; +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class InMemoryChatRoomFactory implements ChatRoomFactory +{ + private final ShardingStrategy shardingStrategy; + private final Clock clock; + private final int bufferSize; + + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Creating ChatRoom with buffer-size {}", bufferSize); + int shard = shardingStrategy.selectShard(id); + ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); + return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize)); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java index 5a5f2b25..314e1f03 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java @@ -16,11 +16,6 @@ public class InMemoryChatRoomService implements ChatRoomService private final LinkedHashMap messages; - public InMemoryChatRoomService(LinkedHashMap messages) - { - this.messages = messages; - } - public InMemoryChatRoomService(Flux messageFlux) { log.debug("Creating InMemoryChatRoomService"); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 3a990191..e21ead5a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -18,14 +19,26 @@ import java.time.Clock; public class InMemoryServicesConfiguration { @Bean - InMemoryChatHomeService chatHomeService( - StorageStrategy storageStrategy, + InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy) + { + return new InMemoryChatHomeService(1, storageStrategy.read()); + } + + @Bean + InMemoryChatRoomFactory chatRoomFactory( + ShardingStrategy strategy, Clock clock, ChatBackendProperties properties) { - return new InMemoryChatHomeService( - storageStrategy.read(), + return new InMemoryChatRoomFactory( + strategy, clock, properties.getChatroomBufferSize()); } + + @Bean + ShardingStrategy shardingStrategy() + { + return chatRoomId -> 0; + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 1fd307f5..d043696c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -105,6 +105,7 @@ public class FilesStorageStrategy implements StorageStrategy .map(chatRoomTo -> new ChatRoom( chatRoomTo.getId(), chatRoomTo.getName(), + chatRoomTo.getShard(), clock, factory.create(readMessages(chatRoomTo)), bufferSize)); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java index 1ad8d178..27e65e41 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java @@ -20,6 +20,7 @@ public class ChatRoomTo @Id private String id; private String name; + private int shard; private List messages; public static ChatRoomTo from(ChatRoom chatroom) @@ -27,6 +28,7 @@ public class ChatRoomTo return new ChatRoomTo( chatroom.getId().toString(), chatroom.getName(), + chatroom.getShard(), chatroom .getMessages() .map(MessageTo::from) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 08ed93b0..8429fe83 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -37,6 +37,7 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> new ChatRoom( UUID.fromString(chatRoomTo.getId()), chatRoomTo.getName(), + chatRoomTo.getShard(), clock, factory.create( Flux diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 403c9d52..b3504d2e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -36,7 +36,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -56,7 +56,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -77,7 +77,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -103,7 +103,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -126,7 +126,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -167,9 +167,10 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", + 0, Clock.systemDefaultZone(), chatRoomService, 8); - when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom)); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); Message existingMessage = new Message( key, serialNumberExistingMessage, @@ -218,9 +219,10 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", + 0, Clock.systemDefaultZone(), chatRoomService, 8); - when(chatHomeService.getChatRoom(any(UUID.class))) + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))) .thenReturn(Mono.just(chatRoom)); when(chatRoomService.getMessage(any(Message.MessageKey.class))) .thenReturn(Mono.empty()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java index aba4d4a8..2eeca407 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java @@ -8,6 +8,7 @@ import java.time.Clock; import java.util.UUID; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static pl.rzrz.assertj.reactor.Assertions.assertThat; @@ -24,11 +25,12 @@ public class ChatHomeTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), mock(ChatRoomService.class), 8); - when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom)); - ChatHome chatHome = new ChatHome(chatHomeService); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); + ChatHome chatHome = new ChatHome(chatHomeService, 0); // When Mono mono = chatHome.getChatRoom(chatRoom.getId()); @@ -43,8 +45,8 @@ public class ChatHomeTest { // Given ChatHomeService chatHomeService = mock(ChatHomeService.class); - when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); - ChatHome chatHome = new ChatHome(chatHomeService); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + ChatHome chatHome = new ChatHome(chatHomeService, 0); // When Mono mono = chatHome.getChatRoom(UUID.randomUUID()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java index f513c407..9c418f17 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java @@ -24,7 +24,13 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); Message.MessageKey key = Message.MessageKey.of(user, messageId); LocalDateTime timestamp = LocalDateTime.now(); Message message = new Message(key, 0l, timestamp, "Bar"); @@ -45,7 +51,13 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty()); // When @@ -63,7 +75,13 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); Message.MessageKey key = Message.MessageKey.of(user, messageId); Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); LocalDateTime timestamp = LocalDateTime.now(now); @@ -87,7 +105,13 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); Message.MessageKey key = Message.MessageKey.of(user, messageId); Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); LocalDateTime timestamp = LocalDateTime.now(now); @@ -111,7 +135,13 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); Message.MessageKey key = Message.MessageKey.of(user, messageId); Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); LocalDateTime timestamp = LocalDateTime.now(now); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index dc998ab4..e305c128 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -1,13 +1,11 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.*; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.UUID; import java.util.function.Supplier; import static pl.rzrz.assertj.reactor.Assertions.*; @@ -17,14 +15,17 @@ import static pl.rzrz.assertj.reactor.Assertions.*; public abstract class AbstractStorageStrategyIT { protected ChatHome chathome; + protected ChatRoomFactory chatRoomFactory; protected abstract StorageStrategy getStorageStrategy(); protected abstract Supplier getChatHomeServiceSupplier(); + protected abstract ChatRoomFactory getChatRoomFactory(); protected void start() { - chathome = new ChatHome(getChatHomeServiceSupplier().get()); + chathome = new ChatHome(getChatHomeServiceSupplier().get(), 0); + chatRoomFactory = getChatRoomFactory(); } protected void stop() @@ -39,7 +40,9 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); - ChatRoom chatroom = chathome.createChatroom("FOO").block(); + UUID chatRoomId = UUID.randomUUID(); + ChatRoom chatroom = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); + chathome.putChatRoom(chatroom); Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java index 5c88f10e..6fc9aac2 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java @@ -3,7 +3,10 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; @@ -52,7 +55,14 @@ public class InMemoryWithFilesStorageStrategyIT extends AbstractStorageStrategyI @Override protected Supplier getChatHomeServiceSupplier() { - return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8); + return () -> new InMemoryChatHomeService(1, getStorageStrategy().read()); + } + + @Override + protected ChatRoomFactory getChatRoomFactory() + { + ShardingStrategy strategy = chatRoomId -> 0; + return new InMemoryChatRoomFactory(strategy, clock, 8); } @BeforeEach diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java index ae92c9e2..ef30d947 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java @@ -1,8 +1,11 @@ package de.juplo.kafka.chat.backend.persistence; +import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageStrategyIT.DataSourceInitializer; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository; import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy; @@ -19,7 +22,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.context.support.TestPropertySourceUtils; -import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; @@ -52,9 +54,15 @@ public class InMemoryWithMongoDbStorageStrategyIT extends AbstractStorageStrateg @Override protected Supplier getChatHomeServiceSupplier() { - return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8); + return () -> new InMemoryChatHomeService(1, getStorageStrategy().read()); } + @Override + protected ChatRoomFactory getChatRoomFactory() + { + ShardingStrategy strategy = chatRoomId -> 0; + return new InMemoryChatRoomFactory(strategy, clock, 8); + } @TestConfiguration static class InMemoryWithMongoDbStorageStrategyITConfig