From 660fe128040e6e0d06745cdc01aaa749dc72f41a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 28 Oct 2024 10:49:26 +0100 Subject: [PATCH] =?utf8?q?Log-Meldungen=20f=C3=BCr=20das=20Senden=20des=20?= =?utf8?q?Z=C3=A4hlerstands=20erg=C3=A4nzt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/ExampleConsumer.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 9dafb18..89cfd0a 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -231,12 +231,22 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener public void onPartitionsAssigned(Collection partitions) { phaser.bulkRegister(partitions.size()); + log.info( + "{} - Added {} parties for newly assigned partitions. New total number of parties: {}", + id, + partitions.size(), + phaser.getRegisteredParties()); } @Override public void onPartitionsRevoked(Collection partitions) { partitions.forEach(partition -> phaser.arriveAndDeregister()); + log.info( + "{} - Removed {} parties for revoked partitions. New total number of parties: {}", + id, + partitions.size(), + phaser.getRegisteredParties()); } public void shutdown() throws InterruptedException -- 2.20.1