refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
index e40d950..d2fd731 100644 (file)
@@ -1,15 +1,15 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.api.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.time.Clock;
+import java.util.stream.IntStream;
 
 
 @ConditionalOnProperty(
@@ -21,44 +21,43 @@ import java.time.Clock;
 public class InMemoryServicesConfiguration
 {
   @Bean
-  ChatHome[] chatHomes(
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
+  ChatHome noneShardingChatHome(
       ChatBackendProperties properties,
-      InMemoryChatHomeService chatHomeService,
-      StorageStrategy storageStrategy)
+      StorageStrategy storageStrategy,
+      Clock clock)
   {
-    ChatHome[] chatHomes = new ChatHome[properties.getInmemory().getNumShards()];
-    storageStrategy
-        .read()
-        .subscribe(chatRoom ->
-        {
-          int shard = chatRoom.getShard();
-          if (chatHomes[shard] == null)
-            chatHomes[shard] = new ChatHome(chatHomeService, shard);
-        });
-    return chatHomes;
+    return new SimpleChatHome(
+        storageStrategy,
+        clock,
+        properties.getChatroomBufferSize());
   }
 
   @Bean
-  InMemoryChatHomeService chatHomeService(
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  ChatHome kafkalikeShardingChatHome(
       ChatBackendProperties properties,
-      StorageStrategy storageStrategy)
-  {
-    return new InMemoryChatHomeService(
-        properties.getInmemory().getNumShards(),
-        properties.getInmemory().getOwnedShards(),
-        storageStrategy.read());
-  }
-
-  @Bean
-  InMemoryChatRoomFactory chatRoomFactory(
-      ShardingStrategy strategy,
-      Clock clock,
-      ChatBackendProperties properties)
+      StorageStrategy storageStrategy,
+      Clock clock)
   {
-    return new InMemoryChatRoomFactory(
-        strategy,
-        clock,
-        properties.getChatroomBufferSize());
+    int numShards = properties.getInmemory().getNumShards();
+    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+    IntStream
+        .of(properties.getInmemory().getOwnedShards())
+        .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
+            shard,
+            storageStrategy,
+            clock,
+            properties.getChatroomBufferSize()));
+    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+    return new ShardedChatHome(chatHomes, strategy);
   }
 
   @ConditionalOnProperty(