package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Properties;
@Slf4j
-public class ExampleConsumer
+public class ExampleConsumer implements ConsumerRebalanceListener
{
private final String id;
private final String topic;
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ consumer.subscribe(Arrays.asList(topic), this);
running = true;
while (true)
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions
+ .stream()
+ .forEach(partition -> log.info("{} - partition assigned: {}", id, partition));
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions
+ .stream()
+ .forEach(partition -> log.info("{} - partition revoked: {}", id, partition));
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ partitions
+ .stream()
+ .forEach(partition -> log.info("{} - partition lost: {}", id, partition));
+ }
+
public static void main(String[] args) throws Exception
{