1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.common.TopicPartition;
7 import java.time.Clock;
8 import java.time.Duration;
9 import java.time.Instant;
10 import java.util.Collection;
14 @RequiredArgsConstructor
16 public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
18 private final ApplicationRecordHandler recordHandler;
19 private final StateRepository stateRepository;
20 private final String id;
21 private final Clock clock;
22 private final Duration commitInterval;
24 private Instant lastCommit = Instant.EPOCH;
27 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
29 partitions.forEach(tp ->
31 Integer partition = tp.partition();
32 log.info("{} - adding partition: {}", id, partition);
33 StateDocument document =
35 .findById(Integer.toString(partition))
36 .orElse(new StateDocument(partition));
37 recordHandler.addPartition(partition, document.state);
42 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
44 partitions.forEach(tp ->
46 Integer partition = tp.partition();
47 log.info("{} - removing partition: {}", id, partition);
48 Map<String, Long> removed = recordHandler.removePartition(partition);
49 for (String key : removed.keySet())
52 "{} - Seen {} messages for partition={}|key={}",
58 stateRepository.save(new StateDocument(partition, removed));
64 public void beforeNextPoll()
66 if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
68 log.debug("Storing data, last commit: {}", lastCommit);
69 recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
73 lastCommit = clock.instant();