feat: Prepared the application for sharding
authorKai Moritz <kai@juplo.de>
Thu, 12 Jan 2023 22:40:12 +0000 (23:40 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 21:00:56 +0000 (22:00 +0100)
- The `ChatBackendController` stores the `ChatHome`s in an array.
- Reintroduced a `ChatRoomFactory`

21 files changed:
src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java

index 10e9d37..c196b7e 100644 (file)
@@ -16,7 +16,7 @@ public class ChatBackendConfiguration
   @Bean
   public ChatHome chatHome(ChatHomeService chatHomeService)
   {
-    return new ChatHome(chatHomeService);
+    return new ChatHome(chatHomeService, 0);
   }
 
   @Bean
index d0d2763..1c0224b 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.api;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.codec.ServerSentEvent;
@@ -16,26 +17,36 @@ import java.util.UUID;
 @RequiredArgsConstructor
 public class ChatBackendController
 {
-  private final ChatHome chatHome;
+  private final ChatHome[] chatHomes;
+  private final ShardingStrategy selectionStrategy;
+  private final ChatRoomFactory factory;
   private final StorageStrategy storageStrategy;
 
 
   @PostMapping("create")
   public Mono<ChatRoomTo> create(@RequestBody String name)
   {
-    return chatHome.createChatroom(name).map(ChatRoomTo::from);
+    UUID chatRoomId = UUID.randomUUID();
+    return factory
+        .createChatRoom(chatRoomId, name)
+        .flatMap(chatRoom -> chatHomes[chatRoom.getShard()].putChatRoom(chatRoom))
+        .map(ChatRoomTo::from);
   }
 
   @GetMapping("list")
   public Flux<ChatRoomTo> list()
   {
-    return chatHome.getChatRooms().map(chatroom -> ChatRoomTo.from(chatroom));
+    return Flux
+        .fromArray(chatHomes)
+        .flatMap(chatHome -> chatHome.getChatRooms())
+        .map(chatroom -> ChatRoomTo.from(chatroom));
   }
 
   @GetMapping("{chatroomId}/list")
   public Flux<MessageTo> list(@PathVariable UUID chatroomId)
   {
-    return chatHome
+    int shard = selectionStrategy.selectShard(chatroomId);
+    return chatHomes[shard]
         .getChatRoom(chatroomId)
         .flatMapMany(chatroom -> chatroom
             .getMessages()
@@ -45,7 +56,10 @@ public class ChatBackendController
   @GetMapping("{chatroomId}")
   public Mono<ChatRoomTo> get(@PathVariable UUID chatroomId)
   {
-    return chatHome.getChatRoom(chatroomId).map(chatroom -> ChatRoomTo.from(chatroom));
+    int shard = selectionStrategy.selectShard(chatroomId);
+    return chatHomes[shard]
+        .getChatRoom(chatroomId)
+        .map(chatroom -> ChatRoomTo.from(chatroom));
   }
 
   @PutMapping("{chatroomId}/{username}/{messageId}")
@@ -55,8 +69,9 @@ public class ChatBackendController
       @PathVariable Long messageId,
       @RequestBody String text)
   {
+    int shard = selectionStrategy.selectShard(chatroomId);
     return
-        chatHome
+        chatHomes[shard]
             .getChatRoom(chatroomId)
             .flatMap(chatroom -> put(chatroom, username, messageId, text));
   }
@@ -82,8 +97,9 @@ public class ChatBackendController
       @PathVariable String username,
       @PathVariable Long messageId)
   {
+    int shard = selectionStrategy.selectShard(chatroomId);
     return
-        chatHome
+        chatHomes[shard]
             .getChatRoom(chatroomId)
             .flatMap(chatroom -> get(chatroom, username, messageId));
   }
@@ -102,7 +118,8 @@ public class ChatBackendController
   @GetMapping(path = "{chatroomId}/listen")
   public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
   {
-    return chatHome
+    int shard = selectionStrategy.selectShard(chatroomId);
+    return chatHomes[shard]
         .getChatRoom(chatroomId)
         .flatMapMany(chatroom -> listen(chatroom));
   }
@@ -124,6 +141,7 @@ public class ChatBackendController
   @PostMapping("/store")
   public void store()
   {
-    storageStrategy.write(chatHome.getChatRooms());
+    for (int shard = 0; shard < chatHomes.length; shard++)
+      storageStrategy.write(chatHomes[shard].getChatRooms());
   }
 }
index 859f797..e997e4b 100644 (file)
@@ -10,13 +10,15 @@ public class ChatRoomTo
 {
   private UUID id;
   private String name;
+  private int shard;
 
 
   public static ChatRoomTo from(ChatRoom chatroom)
   {
-    ChatRoomTo info = new ChatRoomTo();
-    info.id = chatroom.getId();
-    info.name = chatroom.getName();
-    return info;
+    ChatRoomTo to = new ChatRoomTo();
+    to.id = chatroom.getId();
+    to.name = chatroom.getName();
+    to.shard = chatroom.getShard();
+    return to;
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java
new file mode 100644 (file)
index 0000000..36f7e23
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.api;
+
+import java.util.UUID;
+
+
+public interface ShardingStrategy
+{
+  int selectShard(UUID chatRoomId);
+}
index 1450d3e..2fc0e35 100644 (file)
@@ -13,21 +13,22 @@ import java.util.*;
 public class ChatHome
 {
   private final ChatHomeService service;
+  private final int shard;
 
-  public Mono<ChatRoom> createChatroom(String name)
+  public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
   {
-    return service.createChatRoom(UUID.randomUUID(), name);
+    return service.putChatRoom(chatRoom);
   }
 
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
     return service
-        .getChatRoom(id)
+        .getChatRoom(shard, id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   public Flux<ChatRoom> getChatRooms()
   {
-    return service.getChatRooms();
+    return service.getChatRooms(shard);
   }
 }
index 5743655..d2bc508 100644 (file)
@@ -8,7 +8,7 @@ import java.util.UUID;
 
 public interface ChatHomeService<T extends ChatRoomService>
 {
-  Mono<ChatRoom> createChatRoom(UUID id, String name);
-  Mono<ChatRoom> getChatRoom(UUID id);
-  Flux<ChatRoom> getChatRooms();
+  Mono<ChatRoom> putChatRoom(ChatRoom chatRoom);
+  Mono<ChatRoom> getChatRoom(int shard, UUID id);
+  Flux<ChatRoom> getChatRooms(int shard);
 }
index 3019743..4496585 100644 (file)
@@ -25,23 +25,31 @@ public class ChatRoom
   private final UUID id;
   @Getter
   private final String name;
+  @Getter
+  private final int shard;
   private final Clock clock;
   private final ChatRoomService service;
   private final int bufferSize;
   private Sinks.Many<Message> sink;
 
+
   public ChatRoom(
       UUID id,
       String name,
+      int shard,
       Clock clock,
       ChatRoomService service,
       int bufferSize)
   {
     this.id = id;
     this.name = name;
+    this.shard = shard;
     this.clock = clock;
     this.service = service;
     this.bufferSize = bufferSize;
+    // @RequiredArgsConstructor unfortunately not possible, because
+    // the `bufferSize` is not set, if `createSink()` is called
+    // from the variable declaration!
     this.sink = createSink();
   }
 
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java
new file mode 100644 (file)
index 0000000..324e4b0
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+public interface ChatRoomFactory
+{
+  Mono<ChatRoom> createChatRoom(UUID id, String name);
+}
index 96515bf..acfd936 100644 (file)
@@ -6,9 +6,7 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.time.Clock;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -16,42 +14,36 @@ import java.util.UUID;
 @Slf4j
 public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
 {
-  private final Map<UUID, ChatRoom> chatrooms;
-  private final Clock clock;
-  private final int bufferSize;
+  private final Map<UUID, ChatRoom>[] chatrooms;
 
 
-  public InMemoryChatHomeService(
-      Flux<ChatRoom> chatroomFlux,
-      Clock clock,
-      int bufferSize)
+  public InMemoryChatHomeService(int numShards, Flux<ChatRoom> chatroomFlux)
   {
-    log.debug("Creating InMemoryChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
-    this.chatrooms = new HashMap<>();
-    chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
-    this.clock = clock;
-    this.bufferSize = bufferSize;
+    log.debug("Creating InMemoryChatHomeService");
+    this.chatrooms = new Map[numShards];
+    for (int shard = 0; shard < numShards; shard++)
+        chatrooms[shard] = new HashMap<>();
+    chatroomFlux
+        .toStream()
+        .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
   }
 
   @Override
-  public Mono<ChatRoom> createChatRoom(UUID id, String name)
+  public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
   {
-    InMemoryChatRoomService service =
-        new InMemoryChatRoomService(new LinkedHashMap<>());
-    ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize);
-    chatrooms.put(chatRoom.getId(), chatRoom);
+    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
     return Mono.just(chatRoom);
   }
 
   @Override
-  public Mono<ChatRoom> getChatRoom(UUID id)
+  public Mono<ChatRoom> getChatRoom(int shard, UUID id)
   {
-    return Mono.justOrEmpty(chatrooms.get(id));
+    return Mono.justOrEmpty(chatrooms[shard].get(id));
   }
 
   @Override
-  public Flux<ChatRoom> getChatRooms()
+  public Flux<ChatRoom> getChatRooms(int shard)
   {
-    return Flux.fromStream(chatrooms.values().stream());
+    return Flux.fromStream(chatrooms[shard].values().stream());
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java
new file mode 100644 (file)
index 0000000..50fa705
--- /dev/null
@@ -0,0 +1,33 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class InMemoryChatRoomFactory implements ChatRoomFactory
+{
+  private final ShardingStrategy shardingStrategy;
+  private final Clock clock;
+  private final int bufferSize;
+
+
+  @Override
+  public Mono<ChatRoom> createChatRoom(UUID id, String name)
+  {
+    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    int shard = shardingStrategy.selectShard(id);
+    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+    return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize));
+  }
+}
index 5a5f2b2..314e1f0 100644 (file)
@@ -16,11 +16,6 @@ public class InMemoryChatRoomService implements ChatRoomService
   private final LinkedHashMap<Message.MessageKey, Message> messages;
 
 
-  public InMemoryChatRoomService(LinkedHashMap<Message.MessageKey, Message> messages)
-  {
-    this.messages = messages;
-  }
-
   public InMemoryChatRoomService(Flux<Message> messageFlux)
   {
     log.debug("Creating InMemoryChatRoomService");
index 3a99019..e21ead5 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
@@ -18,14 +19,26 @@ import java.time.Clock;
 public class InMemoryServicesConfiguration
 {
   @Bean
-  InMemoryChatHomeService chatHomeService(
-      StorageStrategy storageStrategy,
+  InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy)
+  {
+    return new InMemoryChatHomeService(1, storageStrategy.read());
+  }
+
+  @Bean
+  InMemoryChatRoomFactory chatRoomFactory(
+      ShardingStrategy strategy,
       Clock clock,
       ChatBackendProperties properties)
   {
-    return new InMemoryChatHomeService(
-        storageStrategy.read(),
+    return new InMemoryChatRoomFactory(
+        strategy,
         clock,
         properties.getChatroomBufferSize());
   }
+
+  @Bean
+  ShardingStrategy shardingStrategy()
+  {
+    return chatRoomId -> 0;
+  }
 }
index 1fd307f..d043696 100644 (file)
@@ -105,6 +105,7 @@ public class FilesStorageStrategy implements StorageStrategy
         .map(chatRoomTo -> new ChatRoom(
             chatRoomTo.getId(),
             chatRoomTo.getName(),
+            chatRoomTo.getShard(),
             clock,
             factory.create(readMessages(chatRoomTo)),
             bufferSize));
index 1ad8d17..27e65e4 100644 (file)
@@ -20,6 +20,7 @@ public class ChatRoomTo
   @Id
   private String id;
   private String name;
+  private int shard;
   private List<MessageTo> messages;
 
   public static ChatRoomTo from(ChatRoom chatroom)
@@ -27,6 +28,7 @@ public class ChatRoomTo
     return new ChatRoomTo(
         chatroom.getId().toString(),
         chatroom.getName(),
+        chatroom.getShard(),
         chatroom
             .getMessages()
             .map(MessageTo::from)
index 08ed93b..8429fe8 100644 (file)
@@ -37,6 +37,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo -> new ChatRoom(
             UUID.fromString(chatRoomTo.getId()),
             chatRoomTo.getName(),
+            chatRoomTo.getShard(),
             clock,
             factory.create(
                 Flux
index 403c9d5..b3504d2 100644 (file)
@@ -36,7 +36,7 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
-    when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -56,7 +56,7 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
-    when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -77,7 +77,7 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
-    when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -103,7 +103,7 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
-    when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -126,7 +126,7 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
-    when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -167,9 +167,10 @@ public class ChatBackendControllerTest
     ChatRoom chatRoom = new ChatRoom(
         chatroomId,
         "Test-ChatRoom",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService, 8);
-    when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom));
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
     Message existingMessage = new Message(
         key,
         serialNumberExistingMessage,
@@ -218,9 +219,10 @@ public class ChatBackendControllerTest
     ChatRoom chatRoom = new ChatRoom(
         chatroomId,
         "Test-ChatRoom",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService, 8);
-    when(chatHomeService.getChatRoom(any(UUID.class)))
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class)))
         .thenReturn(Mono.just(chatRoom));
     when(chatRoomService.getMessage(any(Message.MessageKey.class)))
         .thenReturn(Mono.empty());
index aba4d4a..2eeca40 100644 (file)
@@ -8,6 +8,7 @@ import java.time.Clock;
 import java.util.UUID;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static pl.rzrz.assertj.reactor.Assertions.assertThat;
@@ -24,11 +25,12 @@ public class ChatHomeTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
+        0,
         Clock.systemDefaultZone(),
         mock(ChatRoomService.class),
         8);
-    when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom));
-    ChatHome chatHome = new ChatHome(chatHomeService);
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
+    ChatHome chatHome = new ChatHome(chatHomeService, 0);
 
     // When
     Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
@@ -43,8 +45,8 @@ public class ChatHomeTest
   {
     // Given
     ChatHomeService chatHomeService = mock(ChatHomeService.class);
-    when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
-    ChatHome chatHome = new ChatHome(chatHomeService);
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+    ChatHome chatHome = new ChatHome(chatHomeService, 0);
 
     // When
     Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
index f513c40..9c418f1 100644 (file)
@@ -24,7 +24,13 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8);
+    ChatRoom chatRoom = new ChatRoom(
+        UUID.randomUUID(),
+        "Foo",
+        0,
+        Clock.systemDefaultZone(),
+        chatRoomService,
+        8);
     Message.MessageKey key = Message.MessageKey.of(user, messageId);
     LocalDateTime timestamp = LocalDateTime.now();
     Message message = new Message(key, 0l, timestamp, "Bar");
@@ -45,7 +51,13 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8);
+    ChatRoom chatRoom = new ChatRoom(
+        UUID.randomUUID(),
+        "Foo",
+        0,
+        Clock.systemDefaultZone(),
+        chatRoomService,
+        8);
     when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty());
 
     // When
@@ -63,7 +75,13 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8);
+    ChatRoom chatRoom = new ChatRoom(
+        UUID.randomUUID(),
+        "Foo",
+        0,
+        Clock.systemDefaultZone(),
+        chatRoomService,
+        8);
     Message.MessageKey key = Message.MessageKey.of(user, messageId);
     Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault());
     LocalDateTime timestamp = LocalDateTime.now(now);
@@ -87,7 +105,13 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8);
+    ChatRoom chatRoom = new ChatRoom(
+        UUID.randomUUID(),
+        "Foo",
+        0,
+        Clock.systemDefaultZone(),
+        chatRoomService,
+        8);
     Message.MessageKey key = Message.MessageKey.of(user, messageId);
     Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault());
     LocalDateTime timestamp = LocalDateTime.now(now);
@@ -111,7 +135,13 @@ public class ChatRoomTest
     String user = "foo";
     Long messageId = 1l;
     ChatRoomService chatRoomService = mock(ChatRoomService.class);
-    ChatRoom chatRoom = new ChatRoom(UUID.randomUUID(), "Foo", Clock.systemDefaultZone(), chatRoomService, 8);
+    ChatRoom chatRoom = new ChatRoom(
+        UUID.randomUUID(),
+        "Foo",
+        0,
+        Clock.systemDefaultZone(),
+        chatRoomService,
+        8);
     Message.MessageKey key = Message.MessageKey.of(user, messageId);
     Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault());
     LocalDateTime timestamp = LocalDateTime.now(now);
index dc998ab..e305c12 100644 (file)
@@ -1,13 +1,11 @@
 package de.juplo.kafka.chat.backend.persistence;
 
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.*;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.UUID;
 import java.util.function.Supplier;
 
 import static pl.rzrz.assertj.reactor.Assertions.*;
@@ -17,14 +15,17 @@ import static pl.rzrz.assertj.reactor.Assertions.*;
 public abstract class AbstractStorageStrategyIT
 {
   protected ChatHome chathome;
+  protected ChatRoomFactory chatRoomFactory;
 
 
   protected abstract StorageStrategy getStorageStrategy();
   protected abstract Supplier<ChatHomeService> getChatHomeServiceSupplier();
+  protected abstract ChatRoomFactory getChatRoomFactory();
 
   protected void start()
   {
-    chathome = new ChatHome(getChatHomeServiceSupplier().get());
+    chathome = new ChatHome(getChatHomeServiceSupplier().get(), 0);
+    chatRoomFactory = getChatRoomFactory();
   }
 
   protected void stop()
@@ -39,7 +40,9 @@ public abstract class AbstractStorageStrategyIT
 
     assertThat(chathome.getChatRooms().toStream()).hasSize(0);
 
-    ChatRoom chatroom = chathome.createChatroom("FOO").block();
+    UUID chatRoomId = UUID.randomUUID();
+    ChatRoom chatroom = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block();
+    chathome.putChatRoom(chatroom);
     Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
     Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
     Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
index 5c88f10..6fc9aac 100644 (file)
@@ -3,7 +3,10 @@ package de.juplo.kafka.chat.backend.persistence;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
 import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
@@ -52,7 +55,14 @@ public class InMemoryWithFilesStorageStrategyIT extends AbstractStorageStrategyI
   @Override
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
-    return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
+    return () -> new InMemoryChatHomeService(1, getStorageStrategy().read());
+  }
+
+  @Override
+  protected ChatRoomFactory getChatRoomFactory()
+  {
+    ShardingStrategy strategy = chatRoomId -> 0;
+    return new InMemoryChatRoomFactory(strategy, clock, 8);
   }
 
   @BeforeEach
index ae92c9e..ef30d94 100644 (file)
@@ -1,8 +1,11 @@
 package de.juplo.kafka.chat.backend.persistence;
 
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
 import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageStrategyIT.DataSourceInitializer;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
 import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
@@ -19,7 +22,6 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 import org.springframework.test.context.support.TestPropertySourceUtils;
-import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.junit.jupiter.Container;
@@ -52,9 +54,15 @@ public class InMemoryWithMongoDbStorageStrategyIT extends AbstractStorageStrateg
   @Override
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
-    return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
+    return () -> new InMemoryChatHomeService(1, getStorageStrategy().read());
   }
 
+  @Override
+  protected ChatRoomFactory getChatRoomFactory()
+  {
+    ShardingStrategy strategy = chatRoomId -> 0;
+    return new InMemoryChatRoomFactory(strategy, clock, 8);
+  }
 
   @TestConfiguration
   static class InMemoryWithMongoDbStorageStrategyITConfig