From: Kai Moritz Date: Wed, 10 Jun 2026 12:32:13 +0000 (+0200) Subject: `ConsumerRebalanceListener` implementiert, der die Änderungen logged X-Git-Tag: grundlagen/simple-consumer--2026-06-lvm--rebase-vollständig X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9d0c92f01c4d393b70655615abae808619530aac;p=demos%2Fkafka%2Ftraining `ConsumerRebalanceListener` implementiert, der die Änderungen logged --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 6f4e4e88..36ddc70d 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,20 +1,23 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.Properties; @Slf4j -public class ExampleConsumer +public class ExampleConsumer implements ConsumerRebalanceListener { private final String id; private final String topic; @@ -47,7 +50,7 @@ public class ExampleConsumer try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), this); running = true; while (true) @@ -95,6 +98,30 @@ public class ExampleConsumer log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions + .stream() + .forEach(partition -> log.info("{} - partition assigned: {}", id, partition)); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions + .stream() + .forEach(partition -> log.info("{} - partition revoked: {}", id, partition)); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + partitions + .stream() + .forEach(partition -> log.info("{} - partition lost: {}", id, partition)); + } + public static void main(String[] args) throws Exception {