WIP
authorKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 09:59:25 +0000 (10:59 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 09:59:25 +0000 (10:59 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java

index 955369d..c6445a0 100644 (file)
@@ -8,35 +8,25 @@ import java.util.*;
 
 
 @Slf4j
-public class ChatHome implements ChatHome
+public class ChatHome
 {
   private final ChatHomeService service;
-  private final int shard;
 
 
-  public ChatHome(ChatHomeService service, int shard)
-  {
-    log.info("Created SimpleChatHome for shard {}", shard);
-    this.service = service;
-    this.shard = shard;
-  }
-
   public ChatHome(ChatHomeService service)
   {
-    this(service, 0);
+    log.info("Created ChatHome with ChatHomeService {}", service);
   }
 
 
-  @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
     return service
-        .getChatRoom(shard, id)
+        .getChatRoom(id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
-  @Override
-  public Flux<ChatRoom> getChatRooms()
+  public Flux<ChatRoom> getChatRooms(int shard)
   {
     return service.getChatRooms(shard);
   }
index 19ff4aa..fc5ba38 100644 (file)
@@ -8,6 +8,6 @@ import java.util.UUID;
 
 public interface ChatHomeService
 {
-  Mono<ChatRoom> getChatRoom(int shard, UUID id);
+  Mono<ChatRoom> getChatRoom(UUID id);
   Flux<ChatRoom> getChatRooms(int shard);
 }
index 17d94eb..d467eab 100644 (file)
@@ -3,7 +3,8 @@ package de.juplo.kafka.chat.backend.domain;
 import lombok.Getter;
 
 import java.util.Arrays;
-import java.util.UUID;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.stream.Collectors;
 
 
@@ -19,6 +20,19 @@ public class ShardNotOwnedException extends IllegalStateException
   private final int[] ownedShards;
 
 
+  public ShardNotOwnedException(
+      ChatHomeService chatHomeService,
+      ChatRoomInfo chatRoomInfo,
+      int shard,
+      Collection<Integer> ownedShards)
+  {
+    this(
+        chatHomeService,
+        chatRoomInfo,
+        shard,
+        ShardNotOwnedException.toArray(ownedShards));
+  }
+
   public ShardNotOwnedException(
       ChatHomeService chatHomeService,
       ChatRoomInfo chatRoomInfo,
@@ -41,4 +55,14 @@ public class ShardNotOwnedException extends IllegalStateException
     this.shard = shard;
     this.ownedShards = ownedShards;
   }
+
+
+  private static int[] toArray(Collection<Integer> collection)
+  {
+    int[] array = new int[collection.size()];
+    Iterator<Integer> iterator = collection.iterator();
+    for (int i = 0; iterator.hasNext(); i++)
+      array[i] = iterator.next();
+    return array;
+  }
 }
index 33b63d7..5e1f7dc 100644 (file)
@@ -1,8 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.*;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -65,18 +63,31 @@ public class InMemoryChatHomeService implements ChatHomeService
     UUID id = chatRoom.getId();
     int shard = shardingStrategy.selectShard(id);
     if (!ownedShards.contains(shard))
-      throw new IllegalStateException(
-          this,
-          chatRoom,
-          shard,
-          ownedShards.stream().toArray());
+      throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards);
     chatrooms[shard].put(id, chatRoom);
   }
 
   @Override
-  public Mono<ChatRoom> getChatRoom(int shard, UUID id)
+  public Mono<ChatRoom> getChatRoom(UUID id)
   {
-    return Mono.justOrEmpty(chatrooms[shard].get(id));
+    int shard = shardingStrategy.selectShard(id);
+    if (ownedShards.contains(shard))
+    {
+      return Mono.justOrEmpty(chatrooms[shard].get(id));
+    }
+    else
+    {
+      int[] ownedShards = new int[this.ownedShards.size()];
+      Iterator<Integer> iterator = this.ownedShards.iterator();
+      for (int i = 0; iterator.hasNext(); i++)
+      {
+        ownedShards[i] = iterator.next();
+      }
+      return Mono.error(new UnknownChatroomException(
+          id,
+          shard,
+          ownedShards));
+    }
   }
 
   @Override
@@ -84,10 +95,4 @@ public class InMemoryChatHomeService implements ChatHomeService
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
-
-
-  private int selectShard(UUID chatroomId)
-  {
-    return shardingStrategy.selectShard(chatroomId);
-  }
 }