NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ShardedChatHome.java
index ffa7860..4b8c7f1 100644 (file)
@@ -1,25 +1,42 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 
-@RequiredArgsConstructor
+@Slf4j
 public class ShardedChatHome implements ChatHome
 {
   private final ChatHome[] chatHomes;
-  private final ShardingStrategy selectionStrategy;
+  private final Set<Integer> ownedShards;
+  private final ShardingStrategy shardingStrategy;
 
 
-  @Override
-  public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
+  public  ShardedChatHome(
+      ChatHome[] chatHomes,
+      ShardingStrategy shardingStrategy)
   {
-    return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom);
+    this.chatHomes = chatHomes;
+    this.shardingStrategy = shardingStrategy;
+    this.ownedShards = new HashSet<>();
+    for (int shard = 0; shard < chatHomes.length; shard++)
+      if(chatHomes[shard] != null)
+        this.ownedShards.add(shard);
+    log.info(
+        "Created ShardedChatHome for shards: {}",
+        ownedShards
+            .stream()
+            .map(String::valueOf)
+            .collect(Collectors.joining(", ")));
   }
 
+
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
@@ -30,13 +47,13 @@ public class ShardedChatHome implements ChatHome
   public Flux<ChatRoom> getChatRooms()
   {
     return Flux
-        .fromArray(chatHomes)
-        .flatMap(chatHome -> chatHome.getChatRooms());
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRooms());
   }
 
 
   private int selectShard(UUID chatroomId)
   {
-    return selectionStrategy.selectShard(chatroomId);
+    return shardingStrategy.selectShard(chatroomId);
   }
 }