feat: Reintroduced `ChatRoom.shard`, becaus it is needed as a routing-hint
authorKai Moritz <kai@juplo.de>
Sat, 14 Jan 2023 18:53:16 +0000 (19:53 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 21:02:41 +0000 (22:02 +0100)
- Technically, the attribute is not needed.
- But, to implement the gua/sha-pattern, it is needed as routing-hing.
- Hence, it is reintroduced here and also added to the `ChatRoomTo` to
  be send to the client.

14 files changed:
src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java

index 3cc5921..e997e4b 100644 (file)
@@ -10,6 +10,7 @@ public class ChatRoomTo
 {
   private UUID id;
   private String name;
+  private int shard;
 
 
   public static ChatRoomTo from(ChatRoom chatroom)
@@ -17,6 +18,7 @@ public class ChatRoomTo
     ChatRoomTo to = new ChatRoomTo();
     to.id = chatroom.getId();
     to.name = chatroom.getName();
+    to.shard = chatroom.getShard();
     return to;
   }
 }
index cc5c5a0..4f855b8 100644 (file)
@@ -25,6 +25,8 @@ public class ChatRoom
   private final UUID id;
   @Getter
   private final String name;
+  @Getter
+  private final int shard;
   private final Clock clock;
   private final ChatRoomService service;
   private final int bufferSize;
@@ -34,6 +36,7 @@ public class ChatRoom
   public ChatRoom(
       UUID id,
       String name,
+      int shard,
       Clock clock,
       ChatRoomService service,
       int bufferSize)
@@ -41,6 +44,7 @@ public class ChatRoom
     log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
     this.id = id;
     this.name = name;
+    this.shard = shard;
     this.clock = clock;
     this.service = service;
     this.bufferSize = bufferSize;
index 95a30db..fd54d34 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -13,18 +12,15 @@ import java.util.*;
 @Slf4j
 public class InMemoryChatHomeService implements ChatHomeService
 {
-  private final ShardingStrategy shardingStrategy;
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
   public InMemoryChatHomeService(
-      ShardingStrategy shardingStrategy,
       int numShards,
       int[] ownedShards,
       Flux<ChatRoom> chatroomFlux)
   {
     log.debug("Creating InMemoryChatHomeService");
-    this.shardingStrategy = shardingStrategy;
     this.chatrooms = new Map[numShards];
     Set<Integer> owned = Arrays
         .stream(ownedShards)
@@ -41,8 +37,7 @@ public class InMemoryChatHomeService implements ChatHomeService
     chatroomFlux
         .filter(chatRoom ->
         {
-          int shard = shardingStrategy.selectShard(chatRoom.getId());
-          if (owned.contains(shard))
+          if (owned.contains(chatRoom.getShard()))
           {
             return true;
           }
@@ -53,16 +48,13 @@ public class InMemoryChatHomeService implements ChatHomeService
           }
         })
         .toStream()
-        .forEach(chatRoom ->
-        {
-          getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
-        });
+        .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
   }
 
   @Override
   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
   {
-    getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
+    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
     return Mono.just(chatRoom);
   }
 
@@ -77,11 +69,4 @@ public class InMemoryChatHomeService implements ChatHomeService
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
-
-
-  private Map<UUID, ChatRoom> getChatRoomMapFor(ChatRoom chatRoom)
-  {
-    int shard = shardingStrategy.selectShard(chatRoom.getId());
-    return chatrooms[shard];
-  }
 }
index 3e1d360..7fff359 100644 (file)
@@ -26,7 +26,8 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory
   public Mono<ChatRoom> createChatRoom(UUID id, String name)
   {
     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    int shard = shardingStrategy.selectShard(id);
     ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
-    return Mono.just(new ChatRoom(id, name, clock, service, bufferSize));
+    return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize));
   }
 }
index 1dca040..96ef05c 100644 (file)
@@ -45,23 +45,22 @@ public class InMemoryServicesConfiguration
       StorageStrategy storageStrategy)
   {
     int numShards = properties.getInmemory().getNumShards();
-    ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
     storageStrategy
         .read()
         .subscribe(chatRoom ->
         {
-          int shard = shardingStrategy.selectShard(chatRoom.getId());
+          int shard = chatRoom.getShard();
           if (chatHomes[shard] == null)
             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
         });
-    return new ShardedChatHome(chatHomes, shardingStrategy);
+    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+    return new ShardedChatHome(chatHomes, strategy);
   }
 
   @Bean
   InMemoryChatHomeService chatHomeService(
       ChatBackendProperties properties,
-      ShardingStrategy shardingStrategy,
       StorageStrategy storageStrategy)
   {
     ShardingStrategyType sharding =
@@ -73,7 +72,6 @@ public class InMemoryServicesConfiguration
         ? new int[] { 0 }
         : properties.getInmemory().getOwnedShards();
     return new InMemoryChatHomeService(
-        shardingStrategy,
         numShards,
         ownedShards,
         storageStrategy.read());
index 0da1b95..df730aa 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -31,13 +32,14 @@ public class FilesStorageConfiguration
   public StorageStrategy storageStrategy(
       ChatBackendProperties properties,
       Clock clock,
+      ShardingStrategy shardingStrategy,
       ObjectMapper mapper)
   {
     return new FilesStorageStrategy(
         Paths.get(properties.getInmemory().getStorageDirectory()),
         clock,
         properties.getChatroomBufferSize(),
-
+        shardingStrategy,
         messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
index 5d3c067..1e3e5ee 100644 (file)
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
@@ -16,6 +17,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Clock;
+import java.util.UUID;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -31,6 +33,7 @@ public class FilesStorageStrategy implements StorageStrategy
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
@@ -102,12 +105,18 @@ public class FilesStorageStrategy implements StorageStrategy
     return Flux
         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(chatRoomTo -> new ChatRoom(
+        .map(chatRoomTo ->
+        {
+          UUID chatRoomId = chatRoomTo.getId();
+          int shard = shardingStrategy.selectShard(chatRoomId);
+          return new ChatRoom(
               chatRoomTo.getId(),
               chatRoomTo.getName(),
+              shard,
               clock,
               factory.create(readMessages(chatRoomTo)),
-              bufferSize));
+              bufferSize);
+        });
   }
 
   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
index 22d7c26..2b33eed 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -21,12 +22,14 @@ public class MongoDbStorageConfiguration
   public StorageStrategy storageStrategy(
       ChatRoomRepository chatRoomRepository,
       ChatBackendProperties properties,
-      Clock clock)
+      Clock clock,
+      ShardingStrategy shardingStrategy)
   {
     return new MongoDbStorageStrategy(
         chatRoomRepository,
         clock,
         properties.getChatroomBufferSize(),
+        shardingStrategy,
         messageFlux -> new InMemoryChatRoomService(messageFlux));
   }
 }
index 2300219..d21fe2b 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -18,6 +19,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
@@ -37,9 +39,11 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+          int shard = shardingStrategy.selectShard(chatRoomId);
           return new ChatRoom(
               chatRoomId,
               chatRoomTo.getName(),
+              shard,
               clock,
               factory.create(
                   Flux
index a287028..1b25a11 100644 (file)
@@ -170,6 +170,7 @@ public class ChatBackendControllerTest
     ChatRoom chatRoom = new ChatRoom(
         chatroomId,
         "Test-ChatRoom",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService, 8);
     when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
@@ -221,6 +222,7 @@ public class ChatBackendControllerTest
     ChatRoom chatRoom = new ChatRoom(
         chatroomId,
         "Test-ChatRoom",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService, 8);
     when(chatHomeService.getChatRoom(anyInt(), any(UUID.class)))
index d87155c..9c418f1 100644 (file)
@@ -27,6 +27,7 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -53,6 +54,7 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -76,6 +78,7 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -105,6 +108,7 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
@@ -134,6 +138,7 @@ public class ChatRoomTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
+        0,
         Clock.systemDefaultZone(),
         chatRoomService,
         8);
index c9d1b63..5b53607 100644 (file)
@@ -25,6 +25,7 @@ public class SimpleChatHomeTest
     ChatRoom chatRoom = new ChatRoom(
         UUID.randomUUID(),
         "Foo",
+        0,
         Clock.systemDefaultZone(),
         mock(ChatRoomService.class),
         8);
index d6909df..fe7ecac 100644 (file)
@@ -41,6 +41,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT
         path,
         clock,
         8,
+        chatRoomId -> 0,
         messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
@@ -56,7 +57,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
     return () -> new InMemoryChatHomeService(
-        chatRoomId -> 0,
         1,
         new int[] { 0 },
         getStorageStrategy().read());
index ecc64d5..9808aa3 100644 (file)
@@ -55,7 +55,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
     return () -> new InMemoryChatHomeService(
-        chatRoomId -> 0,
         1,
         new int[] { 0 },
         getStorageStrategy().read());
@@ -80,6 +79,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT
           chatRoomRepository,
           clock,
           8,
+          chatRoomId -> 0,
           messageFlux -> new InMemoryChatRoomService(messageFlux));
     }