From: Kai Moritz Date: Fri, 15 Sep 2023 18:56:14 +0000 (+0200) Subject: fix: GREEN - `DataChannel` creates entries for existent chat-rooms X-Git-Tag: rebase--2024-01-26--18-11~22 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=89c85834b1a22c2e0a09d3bae2260caf77e28ffd;p=demos%2Fkafka%2Fchat fix: GREEN - `DataChannel` creates entries for existent chat-rooms --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4d5a1412..d2d6f300 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -36,6 +36,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatRoomData; + private final InfoChannel infoChannel; private boolean running; @Getter @@ -49,7 +50,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ZoneId zoneId, int numShards, int bufferSize, - Clock clock) + Clock clock, + InfoChannel infoChannel) { log.debug( "Creating DataChannel for topic {} with {} partitions", @@ -68,10 +70,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); + this.infoChannel = infoChannel; } @@ -296,6 +296,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(shard)); } - return Mono.justOrEmpty(chatRoomData[shard].get(id)); + return infoChannel + .getChatRoomInfo(id) + .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent( + id, + (chatRoomId) -> + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); + })); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index b5bac470..784ffa54 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -133,7 +133,8 @@ public class KafkaServicesConfiguration Producer producer, Consumer dataChannelConsumer, ZoneId zoneId, - Clock clock) + Clock clock, + InfoChannel infoChannel) { return new DataChannel( properties.getKafka().getDataChannelTopic(), @@ -142,7 +143,8 @@ public class KafkaServicesConfiguration zoneId, properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), - clock); + clock, + infoChannel); } @Bean