WIP
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 22:03:45 +0000 (23:03 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 15:49:53 +0000 (16:49 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index c63ca7e..fe6616a 100644 (file)
@@ -144,6 +144,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
             }
           });
 
+        log.debug(
+          "{} - Waiting for {} parties in phase {}...",
+          id,
+          phaser.getRegisteredParties(),
+          phaser.getPhase());
         int arrivedPhase = phaser.arriveAndAwaitAdvance();
         log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
 
@@ -369,20 +374,37 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
+    log.info("{} - Assigning new partitions", id);
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))
       .forEach(partition -> restoreAndAssign(partition.partition()));
+    log.info("{} - Assignment of new partitions done!", id);
   }
 
   @Override
   public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    log.info("{} - Revoking owned partitions", id);
     commit();
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))
       .forEach(partition -> revoke(partition.partition()));
+    log.info("{} - Revokation of previously owned partitions done!", id);
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    log.info("{} - Removing lost partitions", id);
+    producer.abortTransaction();
+    partitions
+      .stream()
+      .filter(partition -> partition.topic().equals(topic))
+      .forEach(partition -> revoke(partition.partition()));
+    producer.beginTransaction();
+    log.info("{} - Removement of lost done!", id);
   }
 
   private void restoreAndAssign(int partition)