1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.common.TopicPartition;
7 import java.time.Clock;
8 import java.time.Duration;
9 import java.time.Instant;
13 @RequiredArgsConstructor
15 public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
17 private final ApplicationRecordHandler recordHandler;
18 private final AdderResults adderResults;
19 private final StateRepository stateRepository;
20 private final String id;
21 private final Clock clock;
22 private final Duration commitInterval;
24 private final Set<Integer> partitions = new HashSet<>();
26 private Instant lastCommit = Instant.EPOCH;
29 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
31 partitions.forEach(tp ->
33 Integer partition = tp.partition();
34 log.info("{} - adding partition: {}", id, partition);
35 this.partitions.add(partition);
36 StateDocument document =
38 .findById(Integer.toString(partition))
39 .orElse(new StateDocument(partition));
40 recordHandler.addPartition(partition, document.state);
41 for (String user : document.state.keySet())
44 "{} - Restored state for partition={}|user={}: {}",
48 document.state.get(user));
50 adderResults.addPartition(partition, document.results);
55 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
57 partitions.forEach(tp ->
59 Integer partition = tp.partition();
60 log.info("{} - removing partition: {}", id, partition);
61 this.partitions.remove(partition);
62 Map<String, AdderResult> state = recordHandler.removePartition(partition);
63 for (String user : state.keySet())
66 "{} - Saved state for partition={}|user={}: {}",
72 Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
73 stateRepository.save(new StateDocument(partition, state, results));
79 public void beforeNextPoll()
81 if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
83 log.debug("Storing data, last commit: {}", lastCommit);
84 partitions.forEach(partition -> stateRepository.save(
87 recordHandler.getState(partition).getState(),
88 adderResults.getState(partition))));
89 lastCommit = clock.instant();