From 9d0c92f01c4d393b70655615abae808619530aac Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 10 Jun 2026 14:32:13 +0200 Subject: [PATCH] =?utf8?q?`ConsumerRebalanceListener`=20implementiert,=20d?= =?utf8?q?er=20die=20=C3=84nderungen=20logged?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleConsumer.java | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 6f4e4e88..36ddc70d 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,20 +1,23 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.Properties; @Slf4j -public class ExampleConsumer +public class ExampleConsumer implements ConsumerRebalanceListener { private final String id; private final String topic; @@ -47,7 +50,7 @@ public class ExampleConsumer try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), this); running = true; while (true) @@ -95,6 +98,30 @@ public class ExampleConsumer log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions + .stream() + .forEach(partition -> log.info("{} - partition assigned: {}", id, partition)); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions + .stream() + .forEach(partition -> log.info("{} - partition revoked: {}", id, partition)); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + partitions + .stream() + .forEach(partition -> log.info("{} - partition lost: {}", id, partition)); + } + public static void main(String[] args) throws Exception { -- 2.39.5