refactor: Pulled business-logic into class `ShardedChatHome`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
index a0f37f0..96ef05c 100644 (file)
@@ -1,8 +1,10 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.api.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
+import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
@@ -22,12 +24,28 @@ import java.time.Clock;
 public class InMemoryServicesConfiguration
 {
   @Bean
-  ChatHome[] chatHomes(
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
+  ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+  {
+    return new SimpleChatHome(chatHomeService);
+  }
+
+  @Bean
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  ChatHome kafkalikeShardingChatHome(
       ChatBackendProperties properties,
       InMemoryChatHomeService chatHomeService,
       StorageStrategy storageStrategy)
   {
-    SimpleChatHome[] chatHomes = new SimpleChatHome[properties.getInmemory().getNumShards()];
+    int numShards = properties.getInmemory().getNumShards();
+    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
     storageStrategy
         .read()
         .subscribe(chatRoom ->
@@ -36,7 +54,8 @@ public class InMemoryServicesConfiguration
           if (chatHomes[shard] == null)
             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
         });
-    return chatHomes;
+    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+    return new ShardedChatHome(chatHomes, strategy);
   }
 
   @Bean
@@ -44,9 +63,17 @@ public class InMemoryServicesConfiguration
       ChatBackendProperties properties,
       StorageStrategy storageStrategy)
   {
+    ShardingStrategyType sharding =
+        properties.getInmemory().getShardingStrategy();
+    int numShards = sharding == ShardingStrategyType.none
+        ? 1
+        : properties.getInmemory().getNumShards();
+    int[] ownedShards = sharding == ShardingStrategyType.none
+        ? new int[] { 0 }
+        : properties.getInmemory().getOwnedShards();
     return new InMemoryChatHomeService(
-        properties.getInmemory().getNumShards(),
-        properties.getInmemory().getOwnedShards(),
+        numShards,
+        ownedShards,
         storageStrategy.read());
   }