refactor: Pushed sharding one layer down in the architecture
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatHomeService.java
index 8f262a0..25a9bcf 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.*;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -13,31 +12,39 @@ import java.util.*;
 public class InMemoryChatHomeService implements ChatHomeService
 {
   private final Map<UUID, ChatRoom>[] chatrooms;
+  private final Set<Integer> ownedShards;
+  private final ShardingStrategy shardingStrategy;
 
 
   public InMemoryChatHomeService(
       int numShards,
       int[] ownedShards,
+      ShardingStrategy shardingStrategy,
       Flux<ChatRoom> chatroomFlux)
   {
     log.debug("Creating InMemoryChatHomeService");
+
     this.chatrooms = new Map[numShards];
-    Set<Integer> owned = Arrays
+
+    this.ownedShards = Arrays
         .stream(ownedShards)
         .collect(
             () -> new HashSet<>(),
             (set, i) -> set.add(i),
             (a, b) -> a.addAll(b));
+
+    this.shardingStrategy = shardingStrategy;
+
     for (int shard = 0; shard < numShards; shard++)
     {
-      chatrooms[shard] = owned.contains(shard)
+      chatrooms[shard] = this.ownedShards.contains(shard)
           ? new HashMap<>()
           : null;
     }
     chatroomFlux
         .filter(chatRoom ->
         {
-          if (owned.contains(chatRoom.getShard()))
+          if (this.ownedShards.contains(chatRoom.getShard()))
           {
             return true;
           }
@@ -51,20 +58,43 @@ public class InMemoryChatHomeService implements ChatHomeService
         .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
   }
 
-  public void putChatRoom(ChatRoom chatRoom)
+  void putChatRoom(ChatRoom chatRoom)
   {
-    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+    UUID id = chatRoom.getId();
+    int shard = shardingStrategy.selectShard(id);
+    if (!ownedShards.contains(shard))
+      throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards);
+    chatrooms[shard].put(id, chatRoom);
   }
 
   @Override
-  public Mono<ChatRoom> getChatRoom(int shard, UUID id)
+  public Mono<ChatRoom> getChatRoom(UUID id)
   {
-    return Mono.justOrEmpty(chatrooms[shard].get(id));
+    int shard = shardingStrategy.selectShard(id);
+    if (ownedShards.contains(shard))
+    {
+      return Mono.justOrEmpty(chatrooms[shard].get(id));
+    }
+    else
+    {
+      int[] ownedShards = new int[this.ownedShards.size()];
+      Iterator<Integer> iterator = this.ownedShards.iterator();
+      for (int i = 0; iterator.hasNext(); i++)
+      {
+        ownedShards[i] = iterator.next();
+      }
+      return Mono.error(new UnknownChatroomException(
+          id,
+          shard,
+          ownedShards));
+    }
   }
 
   @Override
-  public Flux<ChatRoom> getChatRooms(int shard)
+  public Flux<ChatRoom> getChatRooms()
   {
-    return Flux.fromStream(chatrooms[shard].values().stream());
+    return Flux
+        .fromIterable(ownedShards)
+        .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
   }
 }