refactor: A `ChatRoom` does not have to remember its shard any more
authorKai Moritz <kai@juplo.de>
Sat, 14 Jan 2023 16:33:02 +0000 (17:33 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 21:01:45 +0000 (22:01 +0100)
- The shard of a `ChatRoom` can be derived from its ID, if the configured
  `ShardingStrategy` is known.
- Because the `ShardingStrategy` is known everywhere, where the shard of a
  `ChatRoom` is needed, it always can be derived dynamically.

13 files changed:
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java

index 4496585..1c21fb9 100644 (file)
@@ -25,8 +25,6 @@ public class ChatRoom
   private final UUID id;
   @Getter
   private final String name;
-  @Getter
-  private final int shard;
   private final Clock clock;
   private final ChatRoomService service;
   private final int bufferSize;
@@ -36,14 +34,12 @@ public class ChatRoom
   public ChatRoom(
       UUID id,
       String name,
-      int shard,
       Clock clock,
       ChatRoomService service,
       int bufferSize)
   {
     this.id = id;
     this.name = name;
-    this.shard = shard;
     this.clock = clock;
     this.service = service;
     this.bufferSize = bufferSize;
index 87fa61f..0beb7f3 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -12,15 +13,18 @@ import java.util.*;
 @Slf4j
 public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
 {
+  private final ShardingStrategy shardingStrategy;
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
   public InMemoryChatHomeService(
+      ShardingStrategy shardingStrategy,
       int numShards,
       int[] ownedShards,
       Flux<ChatRoom> chatroomFlux)
   {
     log.debug("Creating InMemoryChatHomeService");
+    this.shardingStrategy = shardingStrategy;
     this.chatrooms = new Map[numShards];
     Set<Integer> owned = Arrays
         .stream(ownedShards)
@@ -37,7 +41,8 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
     chatroomFlux
         .filter(chatRoom ->
         {
-          if (owned.contains(chatRoom.getShard()))
+          int shard = shardingStrategy.selectShard(chatRoom.getId());
+          if (owned.contains(shard))
           {
             return true;
           }
@@ -48,13 +53,16 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
           }
         })
         .toStream()
-        .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
+        .forEach(chatRoom ->
+        {
+          getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
+        });
   }
 
   @Override
   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
   {
-    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+    getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
     return Mono.just(chatRoom);
   }
 
@@ -69,4 +77,11 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
+
+
+  private Map<UUID, ChatRoom> getChatRoomMapFor(ChatRoom chatRoom)
+  {
+    int shard = shardingStrategy.selectShard(chatRoom.getId());
+    return chatrooms[shard];
+  }
 }
index 7fff359..3e1d360 100644 (file)
@@ -26,8 +26,7 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory
   public Mono<ChatRoom> createChatRoom(UUID id, String name)
   {
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    int shard = shardingStrategy.selectShard(id);
     ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
-    return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize));
+    return Mono.just(new ChatRoom(id, name, clock, service, bufferSize));
   }
 }
index 96ef05c..1dca040 100644 (file)
@@ -45,22 +45,23 @@ public class InMemoryServicesConfiguration
       StorageStrategy storageStrategy)
   {
     int numShards = properties.getInmemory().getNumShards();
+    ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
     storageStrategy
         .read()
         .subscribe(chatRoom ->
         {
-          int shard = chatRoom.getShard();
+          int shard = shardingStrategy.selectShard(chatRoom.getId());
           if (chatHomes[shard] == null)
             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
         });
-    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
-    return new ShardedChatHome(chatHomes, strategy);
+    return new ShardedChatHome(chatHomes, shardingStrategy);
   }
 
   @Bean
   InMemoryChatHomeService chatHomeService(
       ChatBackendProperties properties,
+      ShardingStrategy shardingStrategy,
       StorageStrategy storageStrategy)
   {
     ShardingStrategyType sharding =
@@ -72,6 +73,7 @@ public class InMemoryServicesConfiguration
         ? new int[] { 0 }
         : properties.getInmemory().getOwnedShards();
     return new InMemoryChatHomeService(
+        shardingStrategy,
         numShards,
         ownedShards,
         storageStrategy.read());
index df730aa..0da1b95 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.storage.files;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -32,14 +31,13 @@ public class FilesStorageConfiguration
   public StorageStrategy storageStrategy(
       ChatBackendProperties properties,
       Clock clock,
-      ShardingStrategy shardingStrategy,
       ObjectMapper mapper)
   {
     return new FilesStorageStrategy(
         Paths.get(properties.getInmemory().getStorageDirectory()),
         clock,
         properties.getChatroomBufferSize(),
-        shardingStrategy,
+
         messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
index 1e3e5ee..5d3c067 100644 (file)
@@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
@@ -17,7 +16,6 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Clock;
-import java.util.UUID;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -33,7 +31,6 @@ public class FilesStorageStrategy implements StorageStrategy
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
-  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
@@ -105,18 +102,12 @@ public class FilesStorageStrategy implements StorageStrategy
     return Flux
         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(chatRoomTo ->
-        {
-          UUID chatRoomId = chatRoomTo.getId();
-          int shard = shardingStrategy.selectShard(chatRoomId);
-          return new ChatRoom(
+        .map(chatRoomTo -> new ChatRoom(
               chatRoomTo.getId(),
               chatRoomTo.getName(),
-              shard,
               clock,
               factory.create(readMessages(chatRoomTo)),
-              bufferSize);
-        });
+              bufferSize));
   }
 
   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
index 2b33eed..22d7c26 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -22,14 +21,12 @@ public class MongoDbStorageConfiguration
   public StorageStrategy storageStrategy(
       ChatRoomRepository chatRoomRepository,
       ChatBackendProperties properties,
-      Clock clock,
-      ShardingStrategy shardingStrategy)
+      Clock clock)
   {
     return new MongoDbStorageStrategy(
         chatRoomRepository,
         clock,
         properties.getChatroomBufferSize(),
-        shardingStrategy,
         messageFlux -> new InMemoryChatRoomService(messageFlux));
   }
 }
index d21fe2b..2300219 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -19,7 +18,6 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
-  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
@@ -39,11 +37,9 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          int shard = shardingStrategy.selectShard(chatRoomId);
           return new ChatRoom(
               chatRoomId,
               chatRoomTo.getName(),
-              shard,
               clock,
               factory.create(
                   Flux
index 1b25a11..a287028 100644 (file)
@@ -170,7 +170,6 @@ public class ChatBackendControllerTest
     ChatRoom chatRoom = new ChatRoom(
         chatroomId,
         "Test-ChatRoom",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService, 8);
     when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
@@ -222,7 +221,6 @@ public class ChatBackendControllerTest
     ChatRoom chatRoom = new ChatRoom(
         chatroomId,
         "Test-ChatRoom",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService, 8);
     when(chatHomeService.getChatRoom(anyInt(), any(UUID.class)))
index 9c418f1..d87155c 100644 (file)
@@ -27,7 +27,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -54,7 +53,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -78,7 +76,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -108,7 +105,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -138,7 +134,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
index 5b53607..c9d1b63 100644 (file)
@@ -25,7 +25,6 @@ public class SimpleChatHomeTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         mock(ChatRoomService.class),
         8);
index fe7ecac..d6909df 100644 (file)
@@ -41,7 +41,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT
         path,
         clock,
         8,
-        chatRoomId -> 0,
         messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
@@ -57,6 +56,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
     return () -> new InMemoryChatHomeService(
+        chatRoomId -> 0,
         1,
         new int[] { 0 },
         getStorageStrategy().read());
index 9808aa3..ecc64d5 100644 (file)
@@ -55,6 +55,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
     return () -> new InMemoryChatHomeService(
+        chatRoomId -> 0,
         1,
         new int[] { 0 },
         getStorageStrategy().read());
@@ -79,7 +80,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT
           chatRoomRepository,
           clock,
           8,
-          chatRoomId -> 0,
           messageFlux -> new InMemoryChatRoomService(messageFlux));
     }