From 143dbc5172cb65548be16050255c72d43659b2ec Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 23:03:45 +0100 Subject: [PATCH] WIP --- .../java/de/juplo/kafka/ExampleConsumer.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index c63ca7e..fe6616a 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -144,6 +144,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } }); + log.debug( + "{} - Waiting for {} parties in phase {}...", + id, + phaser.getRegisteredParties(), + phaser.getPhase()); int arrivedPhase = phaser.arriveAndAwaitAdvance(); log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase); @@ -369,20 +374,37 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener @Override public void onPartitionsAssigned(Collection partitions) { + log.info("{} - Assigning new partitions", id); partitions .stream() .filter(partition -> partition.topic().equals(topic)) .forEach(partition -> restoreAndAssign(partition.partition())); + log.info("{} - Assignment of new partitions done!", id); } @Override public synchronized void onPartitionsRevoked(Collection partitions) { + log.info("{} - Revoking owned partitions", id); commit(); partitions .stream() .filter(partition -> partition.topic().equals(topic)) .forEach(partition -> revoke(partition.partition())); + log.info("{} - Revokation of previously owned partitions done!", id); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.info("{} - Removing lost partitions", id); + producer.abortTransaction(); + partitions + .stream() + .filter(partition -> partition.topic().equals(topic)) + .forEach(partition -> revoke(partition.partition())); + producer.beginTransaction(); + log.info("{} - Removement of lost done!", id); } private void restoreAndAssign(int partition) -- 2.20.1