NEU
authorKai Moritz <kai@juplo.de>
Wed, 19 Apr 2023 19:24:17 +0000 (21:24 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 19 Apr 2023 19:24:17 +0000 (21:24 +0200)
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index fd42d9d..91115ea 100644 (file)
@@ -21,19 +21,28 @@ import java.time.Clock;
 @ConditionalOnProperty(
     prefix = "chat.backend",
     name = "services",
-    havingValue = "inmemory",
-    matchIfMissing = true)
+    havingValue = "kafka")
 @Configuration
 public class KafkaServicesConfiguration implements ApplicationRunner
 {
   @Bean
   ChatHome kafkaChatHome(
       ChatBackendProperties properties,
-      InMemoryChatHomeService chatHomeService,
-      StorageStrategy storageStrategy)
+      KafkaChatHomeService chatHomeService)
   {
     int numShards = properties.getInmemory().getNumShards();
     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+    for (int shard = 0; shard < numShards; shard++)
+    {
+
+    }
+        .read()
+        .subscribe(chatRoom ->
+        {
+          int shard = chatRoom.getShard();
+          if (chatHomes[shard] == null)
+            chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
+        });
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
     return new ShardedChatHome(chatHomes, strategy);
   }