1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
6 import org.apache.kafka.common.TopicPartition;
8 import java.util.Collection;
9 import java.util.HashMap;
13 @RequiredArgsConstructor
15 public class KeyCountingRebalanceListener implements ConsumerRebalanceListener
17 private final KeyCountingRecordHandler handler;
18 private final String id;
21 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
23 partitions.forEach(tp ->
25 Integer partition = tp.partition();
26 log.info("{} - adding partition: {}", id, partition);
27 handler.addPartition(partition, new HashMap<>());
32 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
34 partitions.forEach(tp ->
36 Integer partition = tp.partition();
37 log.info("{} - removing partition: {}", id, partition);
38 Map<String, Long> removed = handler.removePartition(partition);
39 for (String key : removed.keySet())
42 "{} - Seen {} messages for partition={}|key={}",