X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=5a8d49446d5c0f61bc7d5d3393269a7b3552433a;hb=6045ac97a24bef487d0ba09d02a5dc49c0a25af4;hp=d2d6f3002e2e5f8ffe7ffafb0f3d3152848c39eb;hpb=e65e37f71ee0a81cb505714f3749d4f286e42b57;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index d2d6f300..5a8d4944 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; +import java.util.function.Function; import java.util.stream.IntStream; @@ -95,7 +96,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener producer.send(record, ((metadata, exception) -> { - if (metadata != null) + if (exception == null) { // On successful send Message message = new Message(key, metadata.offset(), timestamp, text); @@ -137,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); + infoChannel.sendShardAssignedEvent(partition); }); consumer.resume(partitions); @@ -150,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int partition = topicPartition.partition(); isShardOwned[partition] = false; log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + infoChannel.sendShardRevokedEvent(partition); }); } @@ -244,14 +247,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent( - chatRoomId, - (id) -> - { - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - return new ChatRoomData(clock, service, bufferSize); - }); + ChatRoomData chatRoomData = this + .chatRoomData[partition] + .computeIfAbsent(chatRoomId, this::computeChatRoomData); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); @@ -298,13 +296,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return infoChannel .getChatRoomInfo(id) - .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent( - id, - (chatRoomId) -> - { - log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - return new ChatRoomData(clock, service, bufferSize); - })); + .map(chatRoomInfo -> + chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); + } + + private ChatRoomData computeChatRoomData(UUID chatRoomId) + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); } }