X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=60d0705cf41e30cc39f1da44b6931f7b08e24302;hb=62ed4a1aa2ca1d903ded1678da669dd347c2fdf5;hp=4d5a1412b4b7ef2107c5799810652c13c57fc965;hpb=f3d558866e0f2a1cb002b558f3d094159a31daf5;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4d5a1412..60d0705c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -25,6 +25,7 @@ import java.util.stream.IntStream; @Slf4j public class DataChannel implements Runnable, ConsumerRebalanceListener { + private final String instanceId; private final String topic; private final Producer producer; private final Consumer consumer; @@ -36,6 +37,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatRoomData; + private final InfoChannel infoChannel; + private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @Getter @@ -43,18 +46,23 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public DataChannel( + String instanceId, String topic, Producer producer, Consumer dataChannelConsumer, ZoneId zoneId, int numShards, int bufferSize, - Clock clock) + Clock clock, + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( - "Creating DataChannel for topic {} with {} partitions", + "{}: Creating DataChannel for topic {} with {} partitions", + instanceId, topic, numShards); + this.instanceId = instanceId; this.topic = topic; this.consumer = dataChannelConsumer; this.producer = producer; @@ -68,10 +76,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); + this.infoChannel = infoChannel; + this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -137,6 +144,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); + infoChannel.sendShardAssignedEvent(partition); + shardingPublisherStrategy + .publishOwnership(partition) + .doOnNext(instanceId -> log.info( + "Instance {} was published as owner of shard {}", + instanceId, + partition)) + .subscribe(); }); consumer.resume(partitions); @@ -149,7 +164,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { int partition = topicPartition.partition(); isShardOwned[partition] = false; + nextOffset[partition] = consumer.position(topicPartition); log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + infoChannel.sendShardRevokedEvent(partition); }); } @@ -244,17 +261,17 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent( - chatRoomId, - (id) -> - { - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - return new ChatRoomData(clock, service, bufferSize); - }); + ChatRoomData chatRoomData = this + .chatRoomData[partition] + .computeIfAbsent(chatRoomId, this::computeChatRoomData); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); + log.debug( + "Loaded message from partition={} at offset={}: {}", + partition, + offset, + message); kafkaChatRoomService.persistMessage(message); } @@ -263,7 +280,12 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + .allMatch(shard -> + { + TopicPartition partition = new TopicPartition(topic, shard); + long position = consumer.position(partition); + return position >= currentOffset[shard]; + }); } private void pauseAllOwnedPartions() @@ -293,9 +315,19 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener if (!isShardOwned[shard]) { - return Mono.error(new ShardNotOwnedException(shard)); + return Mono.error(new ShardNotOwnedException(instanceId, shard)); } - return Mono.justOrEmpty(chatRoomData[shard].get(id)); + return infoChannel + .getChatRoomInfo(id) + .map(chatRoomInfo -> + chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); + } + + private ChatRoomData computeChatRoomData(UUID chatRoomId) + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); } }