1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.common.TopicPartition;
8 import java.time.Clock;
9 import java.time.Duration;
10 import java.time.Instant;
14 @RequiredArgsConstructor
16 public class ApplicationRebalanceListener implements RebalanceListener
18 private final ApplicationRecordHandler recordHandler;
19 private final AdderResults adderResults;
20 private final StateRepository stateRepository;
21 private final String id;
22 private final String topic;
23 private final Clock clock;
24 private final Duration commitInterval;
25 private final Consumer consumer;
27 private final Set<Integer> partitions = new HashSet<>();
29 private Instant lastCommit = Instant.EPOCH;
30 private boolean commitsEnabled = true;
33 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
35 partitions.forEach(tp ->
37 Integer partition = tp.partition();
38 log.info("{} - adding partition: {}", id, partition);
39 this.partitions.add(partition);
40 StateDocument document =
42 .findById(Integer.toString(partition))
43 .orElse(new StateDocument(partition));
44 if (document.offset >= 0)
46 // Only seek, if a stored offset was found
47 // Otherwise: Use initial offset, generated by Kafka
48 consumer.seek(tp, document.offset);
50 "{} - Seeking to offset {} for partition {}",
55 recordHandler.addPartition(partition, document.state);
56 for (String user : document.state.keySet())
59 "{} - Restored state for partition={}|user={}: {}",
63 document.state.get(user));
65 adderResults.addPartition(partition, document.results);
70 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
72 partitions.forEach(tp ->
74 Integer partition = tp.partition();
75 log.info("{} - removing partition: {}", id, partition);
76 this.partitions.remove(partition);
79 Map<String, AdderResult> state = recordHandler.removePartition(partition);
80 Long offset = consumer.position(tp);
82 "{} - offset of next unseen message for partition {} is {}",
86 for (String user : state.keySet())
89 "{} - Saved state for partition={}|user={}: {}",
95 Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
96 stateRepository.save(new StateDocument(partition, state, results, offset));
100 log.info("{} - Offset commits are disabled! Last commit: {}", id, lastCommit);
107 public void beforeNextPoll()
111 log.info("{} - Offset commits are disabled! Last commit: {}", id, lastCommit);
115 if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
119 .forEach(partition ->
121 log.info("{} - persisting state & offset for partition: {}", id, partition);
122 Map<String, AdderResult> state = recordHandler.getState(partition).getState();
123 Long offset = consumer.position(new TopicPartition(topic, partition));
125 "{} - offset of next unseen message for partition {} is {}",
129 for (String user : state.keySet())
132 "{} - Saved state for partition={}|user={}: {}",
138 Map<String, List<AdderResult>> results = adderResults.getState(partition);
139 stateRepository.save(new StateDocument(partition, state, results, offset));
142 lastCommit = clock.instant();
147 public void enableCommits()
149 commitsEnabled = true;
153 public void disableCommits()
155 commitsEnabled = false;