]> juplo.de Git - demos/kafka/chat/commitdiff
refactor: A `ChatRoom` does not have to remember its shard any more
authorKai Moritz <kai@juplo.de>
Sat, 14 Jan 2023 16:33:02 +0000 (17:33 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 21:01:45 +0000 (22:01 +0100)
- The shard of a `ChatRoom` can be derived from its ID, if the configured
  `ShardingStrategy` is known.
- Because the `ShardingStrategy` is known everywhere, where the shard of a
  `ChatRoom` is needed, it always can be derived dynamically.

13 files changed:
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java

index 4496585232c0ea71cdba3fc5c1be5ab6277bd88b..1c21fb979a1f2f4c5a061b2c1243d39f24d4cb5c 100644 (file)
@@ -25,8 +25,6 @@ public class ChatRoom
   private final UUID id;
   @Getter
   private final String name;
-  @Getter
-  private final int shard;
   private final Clock clock;
   private final ChatRoomService service;
   private final int bufferSize;
@@ -36,14 +34,12 @@ public class ChatRoom
   public ChatRoom(
       UUID id,
       String name,
-      int shard,
       Clock clock,
       ChatRoomService service,
       int bufferSize)
   {
     this.id = id;
     this.name = name;
-    this.shard = shard;
     this.clock = clock;
     this.service = service;
     this.bufferSize = bufferSize;
index 87fa61fa1329d339e64d1d7258ac69b80a0f8f9d..0beb7f362a872b80bacb9e007d3b51804f4ab1cf 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -12,15 +13,18 @@ import java.util.*;
 @Slf4j
 public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
 {
+  private final ShardingStrategy shardingStrategy;
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
   public InMemoryChatHomeService(
+      ShardingStrategy shardingStrategy,
       int numShards,
       int[] ownedShards,
       Flux<ChatRoom> chatroomFlux)
   {
     log.debug("Creating InMemoryChatHomeService");
+    this.shardingStrategy = shardingStrategy;
     this.chatrooms = new Map[numShards];
     Set<Integer> owned = Arrays
         .stream(ownedShards)
@@ -37,7 +41,8 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
     chatroomFlux
         .filter(chatRoom ->
         {
-          if (owned.contains(chatRoom.getShard()))
+          int shard = shardingStrategy.selectShard(chatRoom.getId());
+          if (owned.contains(shard))
           {
             return true;
           }
@@ -48,13 +53,16 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
           }
         })
         .toStream()
-        .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
+        .forEach(chatRoom ->
+        {
+          getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
+        });
   }
 
   @Override
   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
   {
-    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+    getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
     return Mono.just(chatRoom);
   }
 
@@ -69,4 +77,11 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
+
+
+  private Map<UUID, ChatRoom> getChatRoomMapFor(ChatRoom chatRoom)
+  {
+    int shard = shardingStrategy.selectShard(chatRoom.getId());
+    return chatrooms[shard];
+  }
 }
index 7fff359ccebd36e293b3e4d0c4285e96413b1a81..3e1d360307825efd04409c9ccb16afce1e7f564f 100644 (file)
@@ -26,8 +26,7 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory
   public Mono<ChatRoom> createChatRoom(UUID id, String name)
   {
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    int shard = shardingStrategy.selectShard(id);
     ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
-    return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize));
+    return Mono.just(new ChatRoom(id, name, clock, service, bufferSize));
   }
 }
index 96ef05c26ddebbe0761b2a764002fead64d76396..1dca04023df825ae14802943e13dc1ea548330e6 100644 (file)
@@ -45,22 +45,23 @@ public class InMemoryServicesConfiguration
       StorageStrategy storageStrategy)
   {
     int numShards = properties.getInmemory().getNumShards();
+    ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
     storageStrategy
         .read()
         .subscribe(chatRoom ->
         {
-          int shard = chatRoom.getShard();
+          int shard = shardingStrategy.selectShard(chatRoom.getId());
           if (chatHomes[shard] == null)
             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
         });
-    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
-    return new ShardedChatHome(chatHomes, strategy);
+    return new ShardedChatHome(chatHomes, shardingStrategy);
   }
 
   @Bean
   InMemoryChatHomeService chatHomeService(
       ChatBackendProperties properties,
+      ShardingStrategy shardingStrategy,
       StorageStrategy storageStrategy)
   {
     ShardingStrategyType sharding =
@@ -72,6 +73,7 @@ public class InMemoryServicesConfiguration
         ? new int[] { 0 }
         : properties.getInmemory().getOwnedShards();
     return new InMemoryChatHomeService(
+        shardingStrategy,
         numShards,
         ownedShards,
         storageStrategy.read());
index df730aafcb8d7899f377181e61f75906f9c6399e..0da1b953b7849bbfe1cb43d2b57b25cf6dc57163 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.storage.files;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -32,14 +31,13 @@ public class FilesStorageConfiguration
   public StorageStrategy storageStrategy(
       ChatBackendProperties properties,
       Clock clock,
-      ShardingStrategy shardingStrategy,
       ObjectMapper mapper)
   {
     return new FilesStorageStrategy(
         Paths.get(properties.getInmemory().getStorageDirectory()),
         clock,
         properties.getChatroomBufferSize(),
-        shardingStrategy,
+
         messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
index 1e3e5eef62dc1b077a1334424166fccfb6e2b92c..5d3c067aff5ea4f1f2c85db83fa71a880c8c8290 100644 (file)
@@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
@@ -17,7 +16,6 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Clock;
-import java.util.UUID;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -33,7 +31,6 @@ public class FilesStorageStrategy implements StorageStrategy
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
-  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
@@ -105,18 +102,12 @@ public class FilesStorageStrategy implements StorageStrategy
     return Flux
         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(chatRoomTo ->
-        {
-          UUID chatRoomId = chatRoomTo.getId();
-          int shard = shardingStrategy.selectShard(chatRoomId);
-          return new ChatRoom(
+        .map(chatRoomTo -> new ChatRoom(
               chatRoomTo.getId(),
               chatRoomTo.getName(),
-              shard,
               clock,
               factory.create(readMessages(chatRoomTo)),
-              bufferSize);
-        });
+              bufferSize));
   }
 
   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
index 2b33eed72657f579cc5e0138ab83dbb8e333c8e1..22d7c26ba638aa56e083f9bd636e291d86b35367 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -22,14 +21,12 @@ public class MongoDbStorageConfiguration
   public StorageStrategy storageStrategy(
       ChatRoomRepository chatRoomRepository,
       ChatBackendProperties properties,
-      Clock clock,
-      ShardingStrategy shardingStrategy)
+      Clock clock)
   {
     return new MongoDbStorageStrategy(
         chatRoomRepository,
         clock,
         properties.getChatroomBufferSize(),
-        shardingStrategy,
         messageFlux -> new InMemoryChatRoomService(messageFlux));
   }
 }
index d21fe2ba75bdbb7d17fa8d777d3304a33b3ce3a3..230021987eaf678b8588ef33a0b028834dfda616 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -19,7 +18,6 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
-  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
@@ -39,11 +37,9 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          int shard = shardingStrategy.selectShard(chatRoomId);
           return new ChatRoom(
               chatRoomId,
               chatRoomTo.getName(),
-              shard,
               clock,
               factory.create(
                   Flux
index 1b25a11eb7874581a0501a0aaaead48c98eb4804..a2870284b4e89f2166027830e73d6eed7910ebd9 100644 (file)
@@ -170,7 +170,6 @@ public class ChatBackendControllerTest
     ChatRoom chatRoom = new ChatRoom(
         chatroomId,
         "Test-ChatRoom",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService, 8);
     when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
@@ -222,7 +221,6 @@ public class ChatBackendControllerTest
     ChatRoom chatRoom = new ChatRoom(
         chatroomId,
         "Test-ChatRoom",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService, 8);
     when(chatHomeService.getChatRoom(anyInt(), any(UUID.class)))
index 9c418f17474ef5c303210b5aab0c3b9bbce4b8f2..d87155c2044c6486d15ff3eb92d6668b40649db5 100644 (file)
@@ -27,7 +27,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -54,7 +53,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -78,7 +76,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -108,7 +105,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -138,7 +134,6 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
index 5b53607f423a2054218b4df27cb6c512e6ab62af..c9d1b633a840f678eadf2890b353b73a7b86e03e 100644 (file)
@@ -25,7 +25,6 @@ public class SimpleChatHomeTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
-        0,
         Clock.systemDefaultZone(),
         mock(ChatRoomService.class),
         8);
index fe7ecac1fe0c74181e3b2349caceb6e6bea85373..d6909df4f4b8f7043d5479cfb3b224cb54a2603e 100644 (file)
@@ -41,7 +41,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT
         path,
         clock,
         8,
-        chatRoomId -> 0,
         messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
@@ -57,6 +56,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
     return () -> new InMemoryChatHomeService(
+        chatRoomId -> 0,
         1,
         new int[] { 0 },
         getStorageStrategy().read());
index 9808aa31d357e988c7e819f7f2537fd5f8d97dda..ecc64d5a125cf12581df84f3bebcef2a58c1ee11 100644 (file)
@@ -55,6 +55,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
     return () -> new InMemoryChatHomeService(
+        chatRoomId -> 0,
         1,
         new int[] { 0 },
         getStorageStrategy().read());
@@ -79,7 +80,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT
           chatRoomRepository,
           clock,
           8,
-          chatRoomId -> 0,
           messageFlux -> new InMemoryChatRoomService(messageFlux));
     }