feat: Introduced a kafka-like `ShardingStrategy` for `inmemory`
authorKai Moritz <kai@juplo.de>
Fri, 13 Jan 2023 23:05:31 +0000 (00:05 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 21:00:58 +0000 (22:00 +0100)
- Introduced the `ShardingStrategy`, that picks a shard for a given
  `ChatRoom`-ID.
  - Implemented a `KafkaLikeShardingStrategy`, that reuses the hashing
    algorithm, that is implementd in `Utils.murmur2()` in the
    `org.apache.kafka:kafka-clients` library.
  - The attribute `shard` of the `ChatHome` has to be restored according
    to the configured `ShardingStrategy` when loading the state - it must
    not be safed with the stored data, because it might change due to
    configuration-changes.
- The `ChatBackendController` was not configured correctly, because it
  had consumed the single `ChatHome` from the old configuration as the
  only entry in its `ChatHome[]`-array.
  - Refined the application-properties: Introduced an inner subclass
    `InMemoryServicesProperties` of `ChatBackendProperties`, that
    encapsulates the properties, that only concern the implementation
    `inmemory`.
  - Added the configuration-parameters `numShards` and `ownedShards`,
    that are needed by `inmemory`, to handle the sharding correctly.
- Introduced `ChatHomeFactory`, because the `ChatHome`s are instanciated
  by `ChatBackendConfiguration`, which is not aware of the configured
  implementation.
- Adjusted the test-cases to the changes.
- Added `InMemoryWithFilesAndShardingConfigurationIT`, that asserts,
  that the application works as expected, if sharding is activated.

24 files changed:
pom.xml
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeFactory.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeFactory.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java
src/test/resources/data/files/chatrooms.json
src/test/resources/data/mongodb/chatRoomTo.json

diff --git a/pom.xml b/pom.xml
index e10af5e..b5b0e81 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-mongodb</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
index c61f848..8e1ff9e 100644 (file)
@@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer
        @Autowired
        ChatBackendProperties properties;
        @Autowired
-       ChatHome chatHome;
+       ChatHome[] chatHomes;
        @Autowired
        StorageStrategy storageStrategy;
 
@@ -32,7 +32,8 @@ public class ChatBackendApplication implements WebFluxConfigurer
        @PreDestroy
        public void onExit()
        {
-               storageStrategy.write(chatHome.getChatRooms());
+               for (int shard = 0; shard < chatHomes.length; shard++)
+                       storageStrategy.write(chatHomes[shard].getChatRooms());
        }
 
        public static void main(String[] args)
index c196b7e..6e74bb0 100644 (file)
@@ -1,7 +1,9 @@
 package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeFactory;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -14,13 +16,25 @@ import java.time.Clock;
 public class ChatBackendConfiguration
 {
   @Bean
-  public ChatHome chatHome(ChatHomeService chatHomeService)
+  ChatHome[] chatHomes(
+      ChatHomeFactory factory,
+      ChatBackendProperties properties,
+      StorageStrategy storageStrategy)
   {
-    return new ChatHome(chatHomeService, 0);
+    ChatHome[] chatHomes = new ChatHome[properties.getInmemory().getNumShards()];
+    storageStrategy
+        .read()
+        .subscribe(chatRoom ->
+        {
+          int shard = chatRoom.getShard();
+          if (chatHomes[shard] == null)
+            chatHomes[shard] = factory.createChatHome(shard);
+        });
+    return chatHomes;
   }
 
   @Bean
-  public Clock clock()
+  Clock clock()
   {
     return Clock.systemDefaultZone();
   }
index 48e5816..5cd4535 100644 (file)
@@ -12,7 +12,24 @@ import java.nio.file.Paths;
 @Setter
 public class ChatBackendProperties
 {
-  private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
   private String allowedOrigins = "http://localhost:4200";
   private int chatroomBufferSize = 8;
+  private ServiceType services = ServiceType.inmemory;
+  private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
+
+
+  @Getter
+  @Setter
+  public static class InMemoryServicesProperties
+  {
+    private ShardingStrategyType shardingStrategy = ShardingStrategyType.kafkalike;
+    private int numShards = 10;
+    private int[] ownedShards = { 2 };
+    private StorageStrategyType storageStrategy = StorageStrategyType.files;
+    private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
+  }
+
+  public enum ServiceType { inmemory }
+  public enum StorageStrategyType { files, mongodb }
+  public enum ShardingStrategyType { none, kafkalike }
 }
index e997e4b..3cc5921 100644 (file)
@@ -10,7 +10,6 @@ public class ChatRoomTo
 {
   private UUID id;
   private String name;
-  private int shard;
 
 
   public static ChatRoomTo from(ChatRoom chatroom)
@@ -18,7 +17,6 @@ public class ChatRoomTo
     ChatRoomTo to = new ChatRoomTo();
     to.id = chatroom.getId();
     to.name = chatroom.getName();
-    to.shard = chatroom.getShard();
     return to;
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java
new file mode 100644 (file)
index 0000000..d06ee58
--- /dev/null
@@ -0,0 +1,20 @@
+package de.juplo.kafka.chat.backend.api;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class KafkaLikeShardingStrategy implements ShardingStrategy
+{
+  private final int numPartitions;
+
+  @Override
+  public int selectShard(UUID chatRoomId)
+  {
+    byte[] serializedKey = chatRoomId.toString().getBytes();
+    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeFactory.java
new file mode 100644 (file)
index 0000000..fb20cc7
--- /dev/null
@@ -0,0 +1,6 @@
+package de.juplo.kafka.chat.backend.domain;
+
+public interface ChatHomeFactory
+{
+  ChatHome createChatHome(int shard);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeFactory.java
new file mode 100644 (file)
index 0000000..d2c6b63
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeFactory;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class InMemoryChatHomeFactory implements ChatHomeFactory
+{
+  private final InMemoryChatHomeService service;
+
+
+  @Override
+  public ChatHome createChatHome(int shard)
+  {
+    return new ChatHome(service, shard);
+  }
+}
index acfd936..87fa61f 100644 (file)
@@ -6,9 +6,7 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 
 
 @Slf4j
@@ -17,13 +15,38 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
-  public InMemoryChatHomeService(int numShards, Flux<ChatRoom> chatroomFlux)
+  public InMemoryChatHomeService(
+      int numShards,
+      int[] ownedShards,
+      Flux<ChatRoom> chatroomFlux)
   {
     log.debug("Creating InMemoryChatHomeService");
     this.chatrooms = new Map[numShards];
+    Set<Integer> owned = Arrays
+        .stream(ownedShards)
+        .collect(
+            () -> new HashSet<>(),
+            (set, i) -> set.add(i),
+            (a, b) -> a.addAll(b));
     for (int shard = 0; shard < numShards; shard++)
-        chatrooms[shard] = new HashMap<>();
+    {
+      chatrooms[shard] = owned.contains(shard)
+          ? new HashMap<>()
+          : null;
+    }
     chatroomFlux
+        .filter(chatRoom ->
+        {
+          if (owned.contains(chatRoom.getShard()))
+          {
+            return true;
+          }
+          else
+          {
+            log.info("Ignoring not owned chat-room {}", chatRoom);
+            return false;
+          }
+        })
         .toStream()
         .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
   }
index e21ead5..94dd5d8 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.api.KafkaLikeShardingStrategy;
 import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -13,15 +14,26 @@ import java.time.Clock;
 @ConditionalOnProperty(
     prefix = "chat.backend",
     name = "services",
-    havingValue = "in-memory",
+    havingValue = "inmemory",
     matchIfMissing = true)
 @Configuration
 public class InMemoryServicesConfiguration
 {
   @Bean
-  InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy)
+  InMemoryChatHomeService chatHomeService(
+      ChatBackendProperties properties,
+      StorageStrategy storageStrategy)
   {
-    return new InMemoryChatHomeService(1, storageStrategy.read());
+    return new InMemoryChatHomeService(
+        properties.getInmemory().getNumShards(),
+        properties.getInmemory().getOwnedShards(),
+        storageStrategy.read());
+  }
+
+  @Bean
+  InMemoryChatHomeFactory chatHomeFactory(InMemoryChatHomeService service)
+  {
+    return new InMemoryChatHomeFactory(service);
   }
 
   @Bean
@@ -36,9 +48,25 @@ public class InMemoryServicesConfiguration
         properties.getChatroomBufferSize());
   }
 
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
   @Bean
-  ShardingStrategy shardingStrategy()
+  ShardingStrategy defaultShardingStrategy()
   {
     return chatRoomId -> 0;
   }
+
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  @Bean
+  ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+  {
+    return new KafkaLikeShardingStrategy(
+        properties.getInmemory().getNumShards());
+  }
 }
index 31e285c..c7e5643 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -16,8 +17,8 @@ import java.time.Clock;
 
 
 @ConditionalOnProperty(
-    prefix = "chat.backend",
-    name = "storage",
+    prefix = "chat.backend.inmemory",
+    name = "storage-strategy",
     havingValue = "files",
     matchIfMissing = true)
 @Configuration
@@ -31,12 +32,14 @@ public class FilesStorageConfiguration
   public StorageStrategy storageStrategy(
       ChatBackendProperties properties,
       Clock clock,
+      ShardingStrategy shardingStrategy,
       ObjectMapper mapper)
   {
     return new FilesStorageStrategy(
-        Paths.get(properties.getStorageDirectory()),
+        Paths.get(properties.getInmemory().getStorageDirectory()),
         clock,
         properties.getChatroomBufferSize(),
+        shardingStrategy,
         messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
index d043696..24c6a01 100644 (file)
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
@@ -16,6 +17,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Clock;
+import java.util.UUID;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -31,6 +33,7 @@ public class FilesStorageStrategy implements StorageStrategy
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
@@ -102,13 +105,18 @@ public class FilesStorageStrategy implements StorageStrategy
     return Flux
         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(chatRoomTo -> new ChatRoom(
-            chatRoomTo.getId(),
-            chatRoomTo.getName(),
-            chatRoomTo.getShard(),
-            clock,
-            factory.create(readMessages(chatRoomTo)),
-            bufferSize));
+        .map(chatRoomTo ->
+        {
+          UUID chatRoomId = chatRoomTo.getId();
+          int shard = shardingStrategy.selectShard(chatRoomId);
+          return new ChatRoom(
+              chatRoomTo.getId(),
+              chatRoomTo.getName(),
+              shard,
+              clock,
+              factory.create(readMessages(chatRoomTo)),
+              bufferSize);
+        });
   }
 
   public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
index 27e65e4..1ad8d17 100644 (file)
@@ -20,7 +20,6 @@ public class ChatRoomTo
   @Id
   private String id;
   private String name;
-  private int shard;
   private List<MessageTo> messages;
 
   public static ChatRoomTo from(ChatRoom chatroom)
@@ -28,7 +27,6 @@ public class ChatRoomTo
     return new ChatRoomTo(
         chatroom.getId().toString(),
         chatroom.getName(),
-        chatroom.getShard(),
         chatroom
             .getMessages()
             .map(MessageTo::from)
index 0a3df75..862dac0 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -11,8 +12,8 @@ import java.time.Clock;
 
 
 @ConditionalOnProperty(
-    prefix = "chat.backend",
-    name = "storage",
+    prefix = "chat.backend.inmemory",
+    name = "storage-strategy",
     havingValue = "mongodb")
 @Configuration
 public class MongoDbStorageConfiguration
@@ -21,12 +22,14 @@ public class MongoDbStorageConfiguration
   public StorageStrategy storageStrategy(
       ChatRoomRepository chatRoomRepository,
       ChatBackendProperties properties,
-      Clock clock)
+      Clock clock,
+      ShardingStrategy shardingStrategy)
   {
     return new MongoDbStorageStrategy(
         chatRoomRepository,
         clock,
         properties.getChatroomBufferSize(),
+        shardingStrategy,
         messageFlux -> new InMemoryChatRoomService(messageFlux));
   }
 }
index 8429fe8..ec90e94 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
@@ -18,6 +19,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
   private final ChatRoomRepository repository;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
 
 
@@ -34,15 +36,20 @@ public class MongoDbStorageStrategy implements StorageStrategy
   {
     return Flux
         .fromIterable(repository.findAll())
-        .map(chatRoomTo -> new ChatRoom(
-            UUID.fromString(chatRoomTo.getId()),
-            chatRoomTo.getName(),
-            chatRoomTo.getShard(),
-            clock,
-            factory.create(
-                Flux
-                    .fromIterable(chatRoomTo.getMessages())
-                    .map(messageTo -> messageTo.toMessage())),
-            bufferSize));
+        .map(chatRoomTo ->
+        {
+          UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+          int shard = shardingStrategy.selectShard(chatRoomId);
+          return new ChatRoom(
+              chatRoomId,
+              chatRoomTo.getName(),
+              shard,
+              clock,
+              factory.create(
+                  Flux
+                      .fromIterable(chatRoomTo.getMessages())
+                      .map(messageTo -> messageTo.toMessage())),
+              bufferSize);
+        });
   }
 }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java
new file mode 100644 (file)
index 0000000..5138dee
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.chat.backend;
+
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+    webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+    properties = {
+        "chat.backend.inmemory.storage-directory=target/test-classes/data/files",
+        "chat.backend.inmemory.sharding-strategy=kafkalike" })
+class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationIT
+{
+}
index 0065219..9027c6f 100644 (file)
@@ -5,7 +5,11 @@ import org.springframework.boot.test.context.SpringBootTest;
 
 @SpringBootTest(
                webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
-               properties = "chat.backend.storage-directory=target/test-classes/data/files")
+               properties = {
+                               "chat.backend.inmemory.sharding-strategy=none",
+                               "chat.backend.inmemory.num-shards=1",
+                               "chat.backend.inmemory.owned-shards=0",
+                               "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
 class InMemoryWithFilesConfigurationIT extends AbstractConfigurationIT
 {
 }
index abb0956..05f1de5 100644 (file)
@@ -15,9 +15,10 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 @SpringBootTest(
                webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
                properties = {
-                               "chat.backend.storage=mongodb",
                                "spring.data.mongodb.host=localhost",
-                               "spring.data.mongodb.database=test" })
+                               "spring.data.mongodb.database=test",
+                               "chat.backend.inmemory.sharding-strategy=kafkalike",
+                               "chat.backend.inmemory.storage-strategy=mongodb" })
 @Testcontainers
 @Slf4j
 class InMemoryWithMongoDbConfigurationIT extends AbstractConfigurationIT
index b3504d2..12d34be 100644 (file)
@@ -1,32 +1,39 @@
 package de.juplo.kafka.chat.backend.api;
 
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
 import org.springframework.http.MediaType;
 import org.springframework.test.web.reactive.server.WebTestClient;
 import reactor.core.publisher.Mono;
 
 import java.time.Clock;
 import java.time.LocalDateTime;
+import java.util.Arrays;
 import java.util.UUID;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
 
 
-@SpringBootTest(properties = "spring.main.allow-bean-definition-overriding=true")
+@SpringBootTest(properties = {
+    "spring.main.allow-bean-definition-overriding=true",
+    "chat.backend.inmemory.owned-shards=0,1,2,3,4,5,6,7,8,9" })
 @AutoConfigureWebTestClient
 @Slf4j
 public class ChatBackendControllerTest
 {
   @MockBean
-  ChatHomeService chatHomeService;
+  InMemoryChatHomeService chatHomeService;
   @MockBean
   ChatRoomService chatRoomService;
 
@@ -248,4 +255,20 @@ public class ChatBackendControllerTest
         .jsonPath("$.username").isEqualTo(user);
     verify(chatRoomService, never()).persistMessage(eq(key), any(LocalDateTime.class), any(String.class));
   }
+
+  @TestConfiguration
+  static class Config
+  {
+    @Bean
+    ChatHome[] chatHomes(
+        ChatBackendProperties properties,
+        ChatHomeFactory factory)
+    {
+      ChatHome[] chatHomes = new ChatHome[properties.getInmemory().getNumShards()];
+      Arrays
+          .stream(properties.getInmemory().getOwnedShards())
+          .forEach(i -> chatHomes[i] = factory.createChatHome(i));
+      return chatHomes;
+    }
+  }
 }
index 2c72ea0..8972b04 100644 (file)
@@ -41,6 +41,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT
         path,
         clock,
         8,
+        chatRoomId -> 0,
         messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
@@ -55,7 +56,10 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT
   @Override
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
-    return () -> new InMemoryChatHomeService(1, getStorageStrategy().read());
+    return () -> new InMemoryChatHomeService(
+        1,
+        new int[] { 0 },
+        getStorageStrategy().read());
   }
 
   @Override
index 8006711..34e2126 100644 (file)
@@ -54,7 +54,10 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT
   @Override
   protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
-    return () -> new InMemoryChatHomeService(1, getStorageStrategy().read());
+    return () -> new InMemoryChatHomeService(
+        1,
+        new int[] { 0 },
+        getStorageStrategy().read());
   }
 
   @Override
@@ -76,6 +79,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT
           chatRoomRepository,
           clock,
           8,
+          chatRoomId -> 0,
           messageFlux -> new InMemoryChatRoomService(messageFlux));
     }
 
index d0f1b92..a2ce941 100644 (file)
@@ -1,5 +1,4 @@
 [ {
   "id" : "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
-  "name" : "FOO",
-  "shard" : 0
-} ]
\ No newline at end of file
+  "name" : "FOO"
+} ]
index 5d1bee0..ae15034 100644 (file)
@@ -1,7 +1,6 @@
 {
   "_id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
   "name": "FOO",
-  "shard": 0,
   "messages": [
     {
       "_id": "peter--1",