1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
7 import org.apache.kafka.common.TopicPartition;
9 import java.util.Collection;
13 @RequiredArgsConstructor
15 public class WordcountRebalanceListener implements ConsumerRebalanceListener
17 private final WordcountRecordHandler handler;
18 private final PartitionStatisticsRepository repository;
19 private final String id;
20 private final Consumer<String, String> consumer;
24 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
26 partitions.forEach(tp ->
28 Integer partition = tp.partition();
29 Long offset = consumer.position(tp);
30 log.info("{} - adding partition: {}, offset={}", id, partition, offset);
31 StatisticsDocument document =
33 .findById(Integer.toString(partition))
34 .orElse(new StatisticsDocument(partition));
35 if (document.offset >= 0)
37 // Only seek, if a stored offset was found
38 // Otherwise: Use initial offset, generated by Kafka
39 consumer.seek(tp, document.offset);
41 handler.addPartition(partition, document.statistics);
46 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
48 partitions.forEach(tp ->
50 Integer partition = tp.partition();
51 Long newOffset = consumer.position(tp);
53 "{} - removing partition: {}, offset of next message {})",
57 Map<String, Map<String, Long>> removed = handler.removePartition(partition);
58 repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));