X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=c13f713e4ace91661795efd731fd74f5bf4df99c;hb=14facb2600def58d0c5f6c83e4d608b54808854a;hp=9cafbaaad2b6afd93f3ec6bc02cbdc6d7956a156;hpb=ec456b2c00027e54a49f3d916e89c831b2589186;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 9cafbaaa..c13f713e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] nextOffset; private final Map[] chatRoomData; private final InfoChannel infoChannel; + private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @Getter @@ -53,7 +54,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int numShards, int bufferSize, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( "{}: Creating DataChannel for topic {} with {} partitions", @@ -76,6 +78,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .range(0, numShards) .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); this.infoChannel = infoChannel; + this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -142,6 +145,18 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener consumer.seek(topicPartition, nextOffset[partition]); infoChannel.sendShardAssignedEvent(partition); + shardingPublisherStrategy + .publishOwnership(partition) + .doOnSuccess(instanceId -> log.info( + "Successfully published instance {} as owner of shard {}", + instanceId, + partition)) + .doOnError(throwable -> log.error( + "Could not publish instance {} as owner of shard {}: {}", + instanceId, + partition, + throwable)) + .block(); }); consumer.resume(partitions); @@ -256,6 +271,11 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); + log.debug( + "Loaded message from partition={} at offset={}: {}", + partition, + offset, + message); kafkaChatRoomService.persistMessage(message); } @@ -264,7 +284,12 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + .allMatch(shard -> + { + TopicPartition partition = new TopicPartition(topic, shard); + long position = consumer.position(partition); + return position >= currentOffset[shard]; + }); } private void pauseAllOwnedPartions()