feat: Introduced events that are send, if a shard is assigned/revoked
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 381152b..5a8d494 100644 (file)
@@ -138,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
           currentOffset);
 
       consumer.seek(topicPartition, nextOffset[partition]);
+      infoChannel.sendShardAssignedEvent(partition);
     });
 
     consumer.resume(partitions);
@@ -151,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+      infoChannel.sendShardRevokedEvent(partition);
     });
   }