WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index da90663..4eedeb4 100644 (file)
@@ -138,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
           currentOffset);
 
       consumer.seek(topicPartition, nextOffset[partition]);
+      infoChannel.sendShardAssignedEvent(partition);
     });
 
     consumer.resume(partitions);
@@ -151,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+      infoChannel.sendShardRevokedEvent(partition);
     });
   }