WIP
authorKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 09:09:19 +0000 (10:09 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 09:09:19 +0000 (10:09 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java
new file mode 100644 (file)
index 0000000..17d94eb
--- /dev/null
@@ -0,0 +1,44 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+public class ShardNotOwnedException extends IllegalStateException
+{
+  @Getter
+  private final ChatHomeService chatHomeService;
+  @Getter
+  private final ChatRoomInfo chatRoomInfo;
+  @Getter
+  private final int shard;
+  @Getter
+  private final int[] ownedShards;
+
+
+  public ShardNotOwnedException(
+      ChatHomeService chatHomeService,
+      ChatRoomInfo chatRoomInfo,
+      int shard,
+      int[] ownedShards)
+  {
+    super(
+        chatHomeService +
+        " does not own the shard " +
+        shard +
+        " for ChatRoom " +
+        chatRoomInfo +
+        " owned shards: " +
+        Arrays
+            .stream(ownedShards)
+            .mapToObj(ownedShard -> Integer.toString(ownedShard))
+            .collect(Collectors.joining(", ")));
+    this.chatHomeService = chatHomeService;
+    this.chatRoomInfo = chatRoomInfo;
+    this.shard = shard;
+    this.ownedShards = ownedShards;
+  }
+}
index 20208e2..714c220 100644 (file)
@@ -8,7 +8,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 
-public class UnknownChatroomException extends RuntimeException
+public class UnknownChatroomException extends IllegalStateException
 {
   @Getter
   private final UUID chatroomId;
index c33103c..33b63d7 100644 (file)
@@ -21,26 +21,32 @@ public class InMemoryChatHomeService implements ChatHomeService
   public InMemoryChatHomeService(
       int numShards,
       int[] ownedShards,
+      ShardingStrategy shardingStrategy,
       Flux<ChatRoom> chatroomFlux)
   {
     log.debug("Creating InMemoryChatHomeService");
+
     this.chatrooms = new Map[numShards];
-    Set<Integer> owned = Arrays
+
+    this.ownedShards = Arrays
         .stream(ownedShards)
         .collect(
             () -> new HashSet<>(),
             (set, i) -> set.add(i),
             (a, b) -> a.addAll(b));
+
+    this.shardingStrategy = shardingStrategy;
+
     for (int shard = 0; shard < numShards; shard++)
     {
-      chatrooms[shard] = owned.contains(shard)
+      chatrooms[shard] = this.ownedShards.contains(shard)
           ? new HashMap<>()
           : null;
     }
     chatroomFlux
         .filter(chatRoom ->
         {
-          if (owned.contains(chatRoom.getShard()))
+          if (this.ownedShards.contains(chatRoom.getShard()))
           {
             return true;
           }
@@ -54,9 +60,17 @@ public class InMemoryChatHomeService implements ChatHomeService
         .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
   }
 
-  public void putChatRoom(ChatRoom chatRoom)
+  void putChatRoom(ChatRoom chatRoom)
   {
-    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+    UUID id = chatRoom.getId();
+    int shard = shardingStrategy.selectShard(id);
+    if (!ownedShards.contains(shard))
+      throw new IllegalStateException(
+          this,
+          chatRoom,
+          shard,
+          ownedShards.stream().toArray());
+    chatrooms[shard].put(id, chatRoom);
   }
 
   @Override