WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
index 96ef05c..00e0c6f 100644 (file)
@@ -2,17 +2,14 @@ package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.time.Clock;
+import java.util.stream.IntStream;
 
 
 @ConditionalOnProperty(
@@ -29,9 +26,9 @@ public class InMemoryServicesConfiguration
       name = "sharding-strategy",
       havingValue = "none",
       matchIfMissing = true)
-  ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+  ChatHome noneShardingChatHome()
   {
-    return new SimpleChatHome(chatHomeService);
+    return new SimpleChatHome();
   }
 
   @Bean
@@ -41,19 +38,13 @@ public class InMemoryServicesConfiguration
       havingValue = "kafkalike")
   ChatHome kafkalikeShardingChatHome(
       ChatBackendProperties properties,
-      InMemoryChatHomeService chatHomeService,
-      StorageStrategy storageStrategy)
+      InMemoryChatHomeService chatHomeService)
   {
     int numShards = properties.getInmemory().getNumShards();
     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
-    storageStrategy
-        .read()
-        .subscribe(chatRoom ->
-        {
-          int shard = chatRoom.getShard();
-          if (chatHomes[shard] == null)
-            chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
-        });
+    IntStream
+        .of(properties.getInmemory().getOwnedShards())
+        .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
     return new ShardedChatHome(chatHomes, strategy);
   }
@@ -79,11 +70,13 @@ public class InMemoryServicesConfiguration
 
   @Bean
   InMemoryChatRoomFactory chatRoomFactory(
+      InMemoryChatHomeService service,
       ShardingStrategy strategy,
       Clock clock,
       ChatBackendProperties properties)
   {
     return new InMemoryChatRoomFactory(
+        service,
         strategy,
         clock,
         properties.getChatroomBufferSize());