fix: Fixed a NPE in `ShardedChatHome.getChatRooms()`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ShardedChatHome.java
index ffa7860..3023f78 100644 (file)
@@ -1,17 +1,40 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 
-@RequiredArgsConstructor
+@Slf4j
 public class ShardedChatHome implements ChatHome
 {
   private final ChatHome[] chatHomes;
-  private final ShardingStrategy selectionStrategy;
+  private final Set<Integer> ownedShards;
+  private final ShardingStrategy shardingStrategy;
+
+
+  public  ShardedChatHome(
+      ChatHome[] chatHomes,
+      ShardingStrategy shardingStrategy)
+  {
+    this.chatHomes = chatHomes;
+    this.shardingStrategy = shardingStrategy;
+    this.ownedShards = new HashSet<>();
+    for (int shard = 0; shard < chatHomes.length; shard++)
+      if(chatHomes[shard] != null)
+        this.ownedShards.add(shard);
+    log.info(
+        "Created ShardedChatHome for shards: {}",
+        ownedShards
+            .stream()
+            .map(String::valueOf)
+            .collect(Collectors.joining(", ")));
+  }
 
 
   @Override
@@ -30,13 +53,13 @@ public class ShardedChatHome implements ChatHome
   public Flux<ChatRoom> getChatRooms()
   {
     return Flux
-        .fromArray(chatHomes)
-        .flatMap(chatHome -> chatHome.getChatRooms());
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRooms());
   }
 
 
   private int selectShard(UUID chatroomId)
   {
-    return selectionStrategy.selectShard(chatroomId);
+    return shardingStrategy.selectShard(chatroomId);
   }
 }