X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=5a8d49446d5c0f61bc7d5d3393269a7b3552433a;hb=6045ac97a24bef487d0ba09d02a5dc49c0a25af4;hp=da906631be6469fa8902d2434645cdb726c4309f;hpb=5c75656653726cdbd8caea31c40c98b330f765eb;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index da906631..5a8d4944 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -96,7 +96,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener producer.send(record, ((metadata, exception) -> { - if (metadata != null) + if (exception == null) { // On successful send Message message = new Message(key, metadata.offset(), timestamp, text); @@ -138,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); + infoChannel.sendShardAssignedEvent(partition); }); consumer.resume(partitions); @@ -151,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int partition = topicPartition.partition(); isShardOwned[partition] = false; log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + infoChannel.sendShardRevokedEvent(partition); }); }