WIP
authorKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 10:31:07 +0000 (11:31 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 10:31:07 +0000 (11:31 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java
deleted file mode 100644 (file)
index 3dc6668..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class ShardedChatHome implements ChatHome
-{
-  private final ChatHome[] chatHomes;
-  private final Set<Integer> ownedShards;
-  private final ShardingStrategy shardingStrategy;
-
-
-  public  ShardedChatHome(
-      ChatHome[] chatHomes,
-      ShardingStrategy shardingStrategy)
-  {
-    this.chatHomes = chatHomes;
-    this.shardingStrategy = shardingStrategy;
-    this.ownedShards = new HashSet<>();
-    for (int shard = 0; shard < chatHomes.length; shard++)
-      if(chatHomes[shard] != null)
-        this.ownedShards.add(shard);
-    log.info(
-        "Created ShardedChatHome for shards: {}",
-        ownedShards
-            .stream()
-            .map(String::valueOf)
-            .collect(Collectors.joining(", ")));
-  }
-
-
-  @Override
-  public Mono<ChatRoom> getChatRoom(UUID id)
-  {
-    int shard = selectShard(id);
-    if (ownedShards.contains(shard))
-    {
-      return chatHomes[shard].getChatRoom(id);
-    }
-    else
-    {
-      int[] ownedShards = new int[this.ownedShards.size()];
-      Iterator<Integer> iterator = this.ownedShards.iterator();
-      for (int i = 0; iterator.hasNext(); i++)
-      {
-        ownedShards[i] = iterator.next();
-      }
-      return Mono.error(new UnknownChatroomException(
-          id,
-          shard,
-          ownedShards));
-    }
-  }
-
-  @Override
-  public Flux<ChatRoom> getChatRooms()
-  {
-    return Flux
-        .fromIterable(ownedShards)
-        .flatMap(shard -> chatHomes[shard].getChatRooms());
-  }
-
-
-  private int selectShard(UUID chatroomId)
-  {
-    return shardingStrategy.selectShard(chatroomId);
-  }
-}
index d8b49b3..6c6951a 100644 (file)
@@ -23,21 +23,11 @@ import java.time.Clock;
 public class InMemoryServicesConfiguration
 {
   @Bean
-  @ConditionalOnProperty(
-      prefix = "chat.backend.inmemory",
-      name = "sharding-strategy",
-      havingValue = "none",
-      matchIfMissing = true)
-  ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+  ChatHome chatHome(InMemoryChatHomeService chatHomeService)
   {
     return new ChatHome(chatHomeService);
   }
 
-  @Bean
-  @ConditionalOnProperty(
-      prefix = "chat.backend.inmemory",
-      name = "sharding-strategy",
-      havingValue = "kafkalike")
   ChatHome kafkalikeShardingChatHome(
       ChatBackendProperties properties,
       InMemoryChatHomeService chatHomeService,
@@ -58,7 +48,49 @@ public class InMemoryServicesConfiguration
   }
 
   @Bean
-  InMemoryChatHomeService chatHomeService(
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
+  InMemoryChatHomeService noneShardingChatHomeService(
+      ChatBackendProperties properties,
+      StorageStrategy storageStrategy)
+  {
+    ShardingStrategyType sharding =
+        properties.getInmemory().getShardingStrategy();
+
+    int numShards = sharding == ShardingStrategyType.none
+        ? 1
+        : properties.getInmemory().getNumShards();
+    int[] ownedShards = sharding == ShardingStrategyType.none
+        ? new int[] { 0 }
+        : properties.getInmemory().getOwnedShards();
+
+    ChatHome[] chatHomes = new ChatHome[numShards];
+    storageStrategy
+        .read()
+        .subscribe(chatRoom ->
+        {
+          int shard = chatRoom.getShard();
+          if (chatHomes[shard] == null)
+            chatHomes[shard] = new ChatHome(chatHomeService, shard);
+        });
+    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+
+    return new InMemoryChatHomeService(
+        numShards,
+        ownedShards,
+        shardingStrategy,
+        storageStrategy.read());
+  }
+
+  @Bean
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  InMemoryChatHomeService kafkalikeShardingChatHomeService(
       ChatBackendProperties properties,
       StorageStrategy storageStrategy)
   {
@@ -73,6 +105,7 @@ public class InMemoryServicesConfiguration
     return new InMemoryChatHomeService(
         numShards,
         ownedShards,
+        shardingStrategy,
         storageStrategy.read());
   }