From f41f6af26ac9d01e7307153a6ab7ba02683caca6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Sep 2023 20:56:14 +0200 Subject: [PATCH] fix: GREEN - `DataChannel` creates entries for existent chat-rooms --- .../implementation/kafka/DataChannel.java | 21 +++++++++++++------ .../kafka/KafkaServicesConfiguration.java | 6 ++++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4d5a1412..d2d6f300 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -36,6 +36,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatRoomData; + private final InfoChannel infoChannel; private boolean running; @Getter @@ -49,7 +50,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ZoneId zoneId, int numShards, int bufferSize, - Clock clock) + Clock clock, + InfoChannel infoChannel) { log.debug( "Creating DataChannel for topic {} with {} partitions", @@ -68,10 +70,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); + this.infoChannel = infoChannel; } @@ -296,6 +296,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(shard)); } - return Mono.justOrEmpty(chatRoomData[shard].get(id)); + return infoChannel + .getChatRoomInfo(id) + .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent( + id, + (chatRoomId) -> + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); + })); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index b5bac470..784ffa54 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -133,7 +133,8 @@ public class KafkaServicesConfiguration Producer producer, Consumer dataChannelConsumer, ZoneId zoneId, - Clock clock) + Clock clock, + InfoChannel infoChannel) { return new DataChannel( properties.getKafka().getDataChannelTopic(), @@ -142,7 +143,8 @@ public class KafkaServicesConfiguration zoneId, properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), - clock); + clock, + infoChannel); } @Bean -- 2.20.1