refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryServicesConfiguration.java
index 1dca040..d2fd731 100644 (file)
@@ -1,18 +1,15 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.time.Clock;
+import java.util.stream.IntStream;
 
 
 @ConditionalOnProperty(
@@ -29,9 +26,15 @@ public class InMemoryServicesConfiguration
       name = "sharding-strategy",
       havingValue = "none",
       matchIfMissing = true)
-  ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+  ChatHome noneShardingChatHome(
+      ChatBackendProperties properties,
+      StorageStrategy storageStrategy,
+      Clock clock)
   {
-    return new SimpleChatHome(chatHomeService);
+    return new SimpleChatHome(
+        storageStrategy,
+        clock,
+        properties.getChatroomBufferSize());
   }
 
   @Bean
@@ -41,54 +44,20 @@ public class InMemoryServicesConfiguration
       havingValue = "kafkalike")
   ChatHome kafkalikeShardingChatHome(
       ChatBackendProperties properties,
-      InMemoryChatHomeService chatHomeService,
-      StorageStrategy storageStrategy)
+      StorageStrategy storageStrategy,
+      Clock clock)
   {
     int numShards = properties.getInmemory().getNumShards();
-    ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
     SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
-    storageStrategy
-        .read()
-        .subscribe(chatRoom ->
-        {
-          int shard = shardingStrategy.selectShard(chatRoom.getId());
-          if (chatHomes[shard] == null)
-            chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
-        });
-    return new ShardedChatHome(chatHomes, shardingStrategy);
-  }
-
-  @Bean
-  InMemoryChatHomeService chatHomeService(
-      ChatBackendProperties properties,
-      ShardingStrategy shardingStrategy,
-      StorageStrategy storageStrategy)
-  {
-    ShardingStrategyType sharding =
-        properties.getInmemory().getShardingStrategy();
-    int numShards = sharding == ShardingStrategyType.none
-        ? 1
-        : properties.getInmemory().getNumShards();
-    int[] ownedShards = sharding == ShardingStrategyType.none
-        ? new int[] { 0 }
-        : properties.getInmemory().getOwnedShards();
-    return new InMemoryChatHomeService(
-        shardingStrategy,
-        numShards,
-        ownedShards,
-        storageStrategy.read());
-  }
-
-  @Bean
-  InMemoryChatRoomFactory chatRoomFactory(
-      ShardingStrategy strategy,
-      Clock clock,
-      ChatBackendProperties properties)
-  {
-    return new InMemoryChatRoomFactory(
-        strategy,
-        clock,
-        properties.getChatroomBufferSize());
+    IntStream
+        .of(properties.getInmemory().getOwnedShards())
+        .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
+            shard,
+            storageStrategy,
+            clock,
+            properties.getChatroomBufferSize()));
+    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+    return new ShardedChatHome(chatHomes, strategy);
   }
 
   @ConditionalOnProperty(