feat: Introduced a kafka-like `ShardingStrategy` for `inmemory`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / ChatBackendConfiguration.java
index 10e9d37..6e74bb0 100644 (file)
@@ -1,7 +1,9 @@
 package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeFactory;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -14,13 +16,25 @@ import java.time.Clock;
 public class ChatBackendConfiguration
 {
   @Bean
-  public ChatHome chatHome(ChatHomeService chatHomeService)
+  ChatHome[] chatHomes(
+      ChatHomeFactory factory,
+      ChatBackendProperties properties,
+      StorageStrategy storageStrategy)
   {
-    return new ChatHome(chatHomeService);
+    ChatHome[] chatHomes = new ChatHome[properties.getInmemory().getNumShards()];
+    storageStrategy
+        .read()
+        .subscribe(chatRoom ->
+        {
+          int shard = chatRoom.getShard();
+          if (chatHomes[shard] == null)
+            chatHomes[shard] = factory.createChatHome(shard);
+        });
+    return chatHomes;
   }
 
   @Bean
-  public Clock clock()
+  Clock clock()
   {
     return Clock.systemDefaultZone();
   }