From: Kai Moritz Date: Sat, 14 Jan 2023 17:09:46 +0000 (+0100) Subject: fix: Fixed a NPE in `ShardedChatHome.getChatRooms()` X-Git-Tag: wip-sharding~22 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=daca33d027e4c0d036fc2aa7c3d9b2120f3ad98a;p=demos%2Fkafka%2Fchat fix: Fixed a NPE in `ShardedChatHome.getChatRooms()` - When collecting the `ChatRoom`s for all shards, unused shards with a `null` value were not skipped. - Also added log-messages of level `INFO` for the creation of `ChatRoom`, `SimpleChatHome` and `ShardedChatHome`. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 1c21fb97..cc5c5a07 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -38,6 +38,7 @@ public class ChatRoom ChatRoomService service, int bufferSize) { + log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); this.id = id; this.name = name; this.clock = clock; diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java index ffa7860a..3023f782 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -1,17 +1,40 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; -@RequiredArgsConstructor +@Slf4j public class ShardedChatHome implements ChatHome { private final ChatHome[] chatHomes; - private final ShardingStrategy selectionStrategy; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; + + + public ShardedChatHome( + ChatHome[] chatHomes, + ShardingStrategy shardingStrategy) + { + this.chatHomes = chatHomes; + this.shardingStrategy = shardingStrategy; + this.ownedShards = new HashSet<>(); + for (int shard = 0; shard < chatHomes.length; shard++) + if(chatHomes[shard] != null) + this.ownedShards.add(shard); + log.info( + "Created ShardedChatHome for shards: {}", + ownedShards + .stream() + .map(String::valueOf) + .collect(Collectors.joining(", "))); + } @Override @@ -30,13 +53,13 @@ public class ShardedChatHome implements ChatHome public Flux getChatRooms() { return Flux - .fromArray(chatHomes) - .flatMap(chatHome -> chatHome.getChatRooms()); + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRooms()); } private int selectShard(UUID chatroomId) { - return selectionStrategy.selectShard(chatroomId); + return shardingStrategy.selectShard(chatroomId); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java index b15eab3b..46802c69 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java @@ -1,6 +1,5 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -9,13 +8,19 @@ import java.util.*; @Slf4j -@RequiredArgsConstructor public class SimpleChatHome implements ChatHome { private final ChatHomeService service; private final int shard; + public SimpleChatHome(ChatHomeService service, int shard) + { + log.info("Created SimpleChatHome for shard {}", shard); + this.service = service; + this.shard = shard; + } + public SimpleChatHome(ChatHomeService service) { this(service, 0); diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java index 8c309b84..c424b294 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java @@ -33,6 +33,15 @@ public abstract class AbstractConfigurationIT .exchange() .expectStatus().isOk() .expectBody().jsonPath("$.status").isEqualTo("UP"); + webTestClient + .get() + .uri("http://localhost:{port}/list", port) + .accept(MediaType.APPLICATION_JSON) + .exchange() + .expectStatus().isOk() + .expectBody() + .jsonPath("$.length()").isEqualTo(1) + .jsonPath("$[0].name").isEqualTo("FOO"); webTestClient .get() .uri("http://localhost:{port}/5c73531c-6fc4-426c-adcb-afc5c140a0f7", port)