fix: `ConsumerTaskRunner` waits until the data-loading is finished
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index da90663..5a8d494 100644 (file)
@@ -96,7 +96,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       producer.send(record, ((metadata, exception) ->
       {
-        if (metadata != null)
+        if (exception == null)
         {
           // On successful send
           Message message = new Message(key, metadata.offset(), timestamp, text);
@@ -138,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
           currentOffset);
 
       consumer.seek(topicPartition, nextOffset[partition]);
+      infoChannel.sendShardAssignedEvent(partition);
     });
 
     consumer.resume(partitions);
@@ -151,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+      infoChannel.sendShardRevokedEvent(partition);
     });
   }