X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=d2d6f3002e2e5f8ffe7ffafb0f3d3152848c39eb;hb=9b0879d7ef3a4811ef48fb9190558937a07f1194;hp=4d5a1412b4b7ef2107c5799810652c13c57fc965;hpb=3e0bfb7923756c0e7d9535cb8919790d11c1f130;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4d5a1412..d2d6f300 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -36,6 +36,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatRoomData; + private final InfoChannel infoChannel; private boolean running; @Getter @@ -49,7 +50,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ZoneId zoneId, int numShards, int bufferSize, - Clock clock) + Clock clock, + InfoChannel infoChannel) { log.debug( "Creating DataChannel for topic {} with {} partitions", @@ -68,10 +70,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); + this.infoChannel = infoChannel; } @@ -296,6 +296,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(shard)); } - return Mono.justOrEmpty(chatRoomData[shard].get(id)); + return infoChannel + .getChatRoomInfo(id) + .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent( + id, + (chatRoomId) -> + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); + })); } }