X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=9cafbaaad2b6afd93f3ec6bc02cbdc6d7956a156;hb=ec456b2c00027e54a49f3d916e89c831b2589186;hp=d2d6f3002e2e5f8ffe7ffafb0f3d3152848c39eb;hpb=8b682010e0052a5c036814e64e5eb94122ed52ce;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index d2d6f300..9cafbaaa 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -25,6 +25,7 @@ import java.util.stream.IntStream; @Slf4j public class DataChannel implements Runnable, ConsumerRebalanceListener { + private final String instanceId; private final String topic; private final Producer producer; private final Consumer consumer; @@ -44,6 +45,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public DataChannel( + String instanceId, String topic, Producer producer, Consumer dataChannelConsumer, @@ -54,9 +56,11 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener InfoChannel infoChannel) { log.debug( - "Creating DataChannel for topic {} with {} partitions", + "{}: Creating DataChannel for topic {} with {} partitions", + instanceId, topic, numShards); + this.instanceId = instanceId; this.topic = topic; this.consumer = dataChannelConsumer; this.producer = producer; @@ -95,7 +99,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener producer.send(record, ((metadata, exception) -> { - if (metadata != null) + if (exception == null) { // On successful send Message message = new Message(key, metadata.offset(), timestamp, text); @@ -137,6 +141,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); + infoChannel.sendShardAssignedEvent(partition); }); consumer.resume(partitions); @@ -150,6 +155,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int partition = topicPartition.partition(); isShardOwned[partition] = false; log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + infoChannel.sendShardRevokedEvent(partition); }); } @@ -244,14 +250,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent( - chatRoomId, - (id) -> - { - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - return new ChatRoomData(clock, service, bufferSize); - }); + ChatRoomData chatRoomData = this + .chatRoomData[partition] + .computeIfAbsent(chatRoomId, this::computeChatRoomData); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); @@ -293,18 +294,19 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener if (!isShardOwned[shard]) { - return Mono.error(new ShardNotOwnedException(shard)); + return Mono.error(new ShardNotOwnedException(instanceId, shard)); } return infoChannel .getChatRoomInfo(id) - .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent( - id, - (chatRoomId) -> - { - log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - return new ChatRoomData(clock, service, bufferSize); - })); + .map(chatRoomInfo -> + chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); + } + + private ChatRoomData computeChatRoomData(UUID chatRoomId) + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); } }