refactor: Pushed sharding one layer down in the architecture
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
index de50448..cb1a070 100644 (file)
@@ -2,11 +2,9 @@ package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
@@ -24,38 +22,9 @@ import java.time.Clock;
 public class InMemoryServicesConfiguration
 {
   @Bean
-  @ConditionalOnProperty(
-      prefix = "chat.backend.inmemory",
-      name = "sharding-strategy",
-      havingValue = "none",
-      matchIfMissing = true)
-  ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
-  {
-    return new SimpleChatHome(chatHomeService);
-  }
-
-  @Bean
-  @ConditionalOnProperty(
-      prefix = "chat.backend.inmemory",
-      name = "sharding-strategy",
-      havingValue = "kafkalike")
-  ChatHome kafkalikeShardingChatHome(
-      ChatBackendProperties properties,
-      InMemoryChatHomeService chatHomeService,
-      StorageStrategy storageStrategy)
+  ChatHome chatHome(InMemoryChatHomeService chatHomeService)
   {
-    int numShards = properties.getInmemory().getNumShards();
-    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
-    storageStrategy
-        .read()
-        .subscribe(chatRoom ->
-        {
-          int shard = chatRoom.getShard();
-          if (chatHomes[shard] == null)
-            chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
-        });
-    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
-    return new ShardedChatHome(chatHomes, strategy);
+    return new ChatHome(chatHomeService);
   }
 
   @Bean
@@ -65,15 +34,31 @@ public class InMemoryServicesConfiguration
   {
     ShardingStrategyType sharding =
         properties.getInmemory().getShardingStrategy();
-    int numShards = sharding == ShardingStrategyType.none
-        ? 1
-        : properties.getInmemory().getNumShards();
-    int[] ownedShards = sharding == ShardingStrategyType.none
-        ? new int[] { 0 }
-        : properties.getInmemory().getOwnedShards();
+
+    int numShards;
+    int[] ownedShards;
+    ShardingStrategy shardingStrategy;
+
+    switch (sharding)
+    {
+      case none:
+        numShards = 1;
+        ownedShards = new int[] { 0 };
+        shardingStrategy = id -> 0;
+        break;
+      case kafkalike:
+        numShards = properties.getInmemory().getNumShards();
+        ownedShards = properties.getInmemory().getOwnedShards();
+        shardingStrategy = new KafkaLikeShardingStrategy(numShards);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown sharding strategy: " + sharding);
+    }
+
     return new InMemoryChatHomeService(
         numShards,
         ownedShards,
+        shardingStrategy,
         storageStrategy.read());
   }