From: Kai Moritz <kai@juplo.de>
Date: Sat, 2 Nov 2024 22:03:45 +0000 (+0100)
Subject: WIP
X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=143dbc5172cb65548be16050255c72d43659b2ec;p=demos%2Fkafka%2Ftraining

WIP
---

diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
index c63ca7e9..fe6616a1 100644
--- a/src/main/java/de/juplo/kafka/ExampleConsumer.java
+++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java
@@ -144,6 +144,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
             }
           });
 
+        log.debug(
+          "{} - Waiting for {} parties in phase {}...",
+          id,
+          phaser.getRegisteredParties(),
+          phaser.getPhase());
         int arrivedPhase = phaser.arriveAndAwaitAdvance();
         log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
 
@@ -369,20 +374,37 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
+    log.info("{} - Assigning new partitions", id);
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))
       .forEach(partition -> restoreAndAssign(partition.partition()));
+    log.info("{} - Assignment of new partitions done!", id);
   }
 
   @Override
   public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    log.info("{} - Revoking owned partitions", id);
     commit();
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))
       .forEach(partition -> revoke(partition.partition()));
+    log.info("{} - Revokation of previously owned partitions done!", id);
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    log.info("{} - Removing lost partitions", id);
+    producer.abortTransaction();
+    partitions
+      .stream()
+      .filter(partition -> partition.topic().equals(topic))
+      .forEach(partition -> revoke(partition.partition()));
+    producer.beginTransaction();
+    log.info("{} - Removement of lost done!", id);
   }
 
   private void restoreAndAssign(int partition)