fix: Removed unnecessary generic in `ChatHomeService`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
index 3a99019..1dca040 100644 (file)
@@ -1,6 +1,12 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
+import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
@@ -12,20 +18,98 @@ import java.time.Clock;
 @ConditionalOnProperty(
     prefix = "chat.backend",
     name = "services",
-    havingValue = "in-memory",
+    havingValue = "inmemory",
     matchIfMissing = true)
 @Configuration
 public class InMemoryServicesConfiguration
 {
+  @Bean
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
+  ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+  {
+    return new SimpleChatHome(chatHomeService);
+  }
+
+  @Bean
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  ChatHome kafkalikeShardingChatHome(
+      ChatBackendProperties properties,
+      InMemoryChatHomeService chatHomeService,
+      StorageStrategy storageStrategy)
+  {
+    int numShards = properties.getInmemory().getNumShards();
+    ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
+    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+    storageStrategy
+        .read()
+        .subscribe(chatRoom ->
+        {
+          int shard = shardingStrategy.selectShard(chatRoom.getId());
+          if (chatHomes[shard] == null)
+            chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
+        });
+    return new ShardedChatHome(chatHomes, shardingStrategy);
+  }
+
   @Bean
   InMemoryChatHomeService chatHomeService(
-      StorageStrategy storageStrategy,
+      ChatBackendProperties properties,
+      ShardingStrategy shardingStrategy,
+      StorageStrategy storageStrategy)
+  {
+    ShardingStrategyType sharding =
+        properties.getInmemory().getShardingStrategy();
+    int numShards = sharding == ShardingStrategyType.none
+        ? 1
+        : properties.getInmemory().getNumShards();
+    int[] ownedShards = sharding == ShardingStrategyType.none
+        ? new int[] { 0 }
+        : properties.getInmemory().getOwnedShards();
+    return new InMemoryChatHomeService(
+        shardingStrategy,
+        numShards,
+        ownedShards,
+        storageStrategy.read());
+  }
+
+  @Bean
+  InMemoryChatRoomFactory chatRoomFactory(
+      ShardingStrategy strategy,
       Clock clock,
       ChatBackendProperties properties)
   {
-    return new InMemoryChatHomeService(
-        storageStrategy.read(),
+    return new InMemoryChatRoomFactory(
+        strategy,
         clock,
         properties.getChatroomBufferSize());
   }
+
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
+  @Bean
+  ShardingStrategy defaultShardingStrategy()
+  {
+    return chatRoomId -> 0;
+  }
+
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  @Bean
+  ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+  {
+    return new KafkaLikeShardingStrategy(
+        properties.getInmemory().getNumShards());
+  }
 }