X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=532a3c12cadd7368bab4b65a5ea15b778e751567;hb=f70bb25f9b0769068028348995a78ece2130cd35;hp=4eedeb4a4fc1c297ca38d71bdd615d10f9d2f419;hpb=a4c69f2736204751a70e916daf451ed6eb7b2994;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4eedeb4a..532a3c12 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -19,13 +19,13 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; -import java.util.function.Function; import java.util.stream.IntStream; @Slf4j public class DataChannel implements Runnable, ConsumerRebalanceListener { + private final String instanceId; private final String topic; private final Producer producer; private final Consumer consumer; @@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] nextOffset; private final Map[] chatRoomData; private final InfoChannel infoChannel; + private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @Getter @@ -45,6 +46,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public DataChannel( + String instanceId, String topic, Producer producer, Consumer dataChannelConsumer, @@ -52,12 +54,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int numShards, int bufferSize, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( - "Creating DataChannel for topic {} with {} partitions", + "{}: Creating DataChannel for topic {} with {} partitions", + instanceId, topic, numShards); + this.instanceId = instanceId; this.topic = topic; this.consumer = dataChannelConsumer; this.producer = producer; @@ -73,6 +78,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .range(0, numShards) .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); this.infoChannel = infoChannel; + this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -139,6 +145,13 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener consumer.seek(topicPartition, nextOffset[partition]); infoChannel.sendShardAssignedEvent(partition); + shardingPublisherStrategy + .publishOwnership(partition) + .doOnNext(instanceId -> log.info( + "Instance {} was published as owner of shard {}", + instanceId, + partition)) + .subscribe(); }); consumer.resume(partitions); @@ -291,7 +304,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener if (!isShardOwned[shard]) { - return Mono.error(new ShardNotOwnedException(shard)); + return Mono.error(new ShardNotOwnedException(instanceId, shard)); } return infoChannel