fix: Fixed a NPE in `ShardedChatHome.getChatRooms()`
authorKai Moritz <kai@juplo.de>
Sat, 14 Jan 2023 17:09:46 +0000 (18:09 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Jan 2023 18:39:01 +0000 (19:39 +0100)
- When collecting the `ChatRoom`s for all shards, unused shards with a
  `null` value were not skipped.
- Also added log-messages of level `INFO` for the creation of `ChatRoom`,
  `SimpleChatHome` and `ShardedChatHome`.

src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java
src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java
src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java

index 1c21fb9..cc5c5a0 100644 (file)
@@ -38,6 +38,7 @@ public class ChatRoom
       ChatRoomService service,
       int bufferSize)
   {
+    log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
     this.id = id;
     this.name = name;
     this.clock = clock;
index ffa7860..3023f78 100644 (file)
@@ -1,17 +1,40 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 
-@RequiredArgsConstructor
+@Slf4j
 public class ShardedChatHome implements ChatHome
 {
   private final ChatHome[] chatHomes;
-  private final ShardingStrategy selectionStrategy;
+  private final Set<Integer> ownedShards;
+  private final ShardingStrategy shardingStrategy;
+
+
+  public  ShardedChatHome(
+      ChatHome[] chatHomes,
+      ShardingStrategy shardingStrategy)
+  {
+    this.chatHomes = chatHomes;
+    this.shardingStrategy = shardingStrategy;
+    this.ownedShards = new HashSet<>();
+    for (int shard = 0; shard < chatHomes.length; shard++)
+      if(chatHomes[shard] != null)
+        this.ownedShards.add(shard);
+    log.info(
+        "Created ShardedChatHome for shards: {}",
+        ownedShards
+            .stream()
+            .map(String::valueOf)
+            .collect(Collectors.joining(", ")));
+  }
 
 
   @Override
@@ -30,13 +53,13 @@ public class ShardedChatHome implements ChatHome
   public Flux<ChatRoom> getChatRooms()
   {
     return Flux
-        .fromArray(chatHomes)
-        .flatMap(chatHome -> chatHome.getChatRooms());
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRooms());
   }
 
 
   private int selectShard(UUID chatroomId)
   {
-    return selectionStrategy.selectShard(chatroomId);
+    return shardingStrategy.selectShard(chatroomId);
   }
 }
index b15eab3..bb2e158 100644 (file)
@@ -9,13 +9,19 @@ import java.util.*;
 
 
 @Slf4j
-@RequiredArgsConstructor
 public class SimpleChatHome implements ChatHome
 {
   private final ChatHomeService service;
   private final int shard;
 
 
+  public SimpleChatHome(ChatHomeService service, int shard)
+  {
+    log.info("Created SimpleChatHome for shard {}", shard);
+    this.service = service;
+    this.shard = shard;
+  }
+
   public SimpleChatHome(ChatHomeService service)
   {
     this(service, 0);
index 8c309b8..c424b29 100644 (file)
@@ -33,6 +33,15 @@ public abstract class AbstractConfigurationIT
               .exchange()
               .expectStatus().isOk()
               .expectBody().jsonPath("$.status").isEqualTo("UP");
+          webTestClient
+              .get()
+              .uri("http://localhost:{port}/list", port)
+              .accept(MediaType.APPLICATION_JSON)
+              .exchange()
+              .expectStatus().isOk()
+              .expectBody()
+                .jsonPath("$.length()").isEqualTo(1)
+                .jsonPath("$[0].name").isEqualTo("FOO");
           webTestClient
               .get()
               .uri("http://localhost:{port}/5c73531c-6fc4-426c-adcb-afc5c140a0f7", port)