+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+
+public class ShardedChatHomeTest extends ChatHomeWithShardsTestBase
+{
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ ShardedChatHome chatHome(
+ Integer numShards,
+ int[] ownedShards,
+ InMemoryChatHomeService chatHomeService)
+ {
+ SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+
+ IntStream
+ .of(ownedShards)
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
+
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+
+ return new ShardedChatHome(chatHomes, strategy);
+ }
+
+ @Bean
+ InMemoryChatHomeService chatHomeService(
+ Integer numShards,
+ int[] ownedShards,
+ StorageStrategy storageStrategy)
+ {
+ return new InMemoryChatHomeService(
+ numShards,
+ ownedShards,
+ storageStrategy.read());
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy(Integer numShards)
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ Clock.systemDefaultZone(),
+ 8,
+ new KafkaLikeShardingStrategy(numShards),
+ messageFlux -> new InMemoryChatRoomService(messageFlux),
+ new ObjectMapper());
+ }
+
+ @Bean
+ Integer numShards()
+ {
+ return 10;
+ }
+
+ @Bean
+ int[] ownedShards()
+ {
+ return new int[] { 2 };
+ }
+ }
+}