From 9b0879d7ef3a4811ef48fb9190558937a07f1194 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Sep 2023 20:56:14 +0200 Subject: [PATCH] fix: GREEN - `DataChannel` creates entries for existent chat-rooms --- .../implementation/kafka/DataChannel.java | 21 +++++++++++++------ .../implementation/kafka/InfoChannel.java | 2 +- .../kafka/KafkaServicesConfiguration.java | 6 ++++-- .../mongodb/MongoDbStorageStrategy.java | 2 +- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4d5a1412..d2d6f300 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -36,6 +36,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatRoomData; + private final InfoChannel infoChannel; private boolean running; @Getter @@ -49,7 +50,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ZoneId zoneId, int numShards, int bufferSize, - Clock clock) + Clock clock, + InfoChannel infoChannel) { log.debug( "Creating DataChannel for topic {} with {} partitions", @@ -68,10 +70,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); + this.infoChannel = infoChannel; } @@ -296,6 +296,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(shard)); } - return Mono.justOrEmpty(chatRoomData[shard].get(id)); + return infoChannel + .getChatRoomInfo(id) + .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent( + id, + (chatRoomId) -> + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); + })); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index ad03f0df..26e86963 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -84,7 +84,7 @@ public class InfoChannel implements Runnable if (metadata != null) { log.info("Successfully sent chreate-request for chat room: {}", to); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard); sink.success(chatRoomInfo); } else diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index b5bac470..784ffa54 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -133,7 +133,8 @@ public class KafkaServicesConfiguration Producer producer, Consumer dataChannelConsumer, ZoneId zoneId, - Clock clock) + Clock clock, + InfoChannel infoChannel) { return new DataChannel( properties.getKafka().getDataChannelTopic(), @@ -142,7 +143,8 @@ public class KafkaServicesConfiguration zoneId, properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), - clock); + clock, + infoChannel); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 3eb90960..b1bead9b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -36,7 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - return new ChatRoomInfo(chatRoomId, chatRoomTo.getName()); + return new ChatRoomInfo(chatRoomId, chatRoomTo.getName(), null); }); } -- 2.20.1