1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
7 import org.apache.kafka.common.TopicPartition;
9 import java.time.Clock;
10 import java.time.Duration;
11 import java.time.Instant;
13 import java.util.concurrent.CountDownLatch;
16 @RequiredArgsConstructor
18 public class ApplicationRebalanceListener implements ConsumerRebalanceListener
20 private final Consumer consumer;
21 private final ApplicationRecordHandler recordHandler;
22 private final AdderResults adderResults;
23 private final StateRepository stateRepository;
24 private final String id;
26 private final Set<Integer> partitions = new HashSet<>();
29 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
31 partitions.forEach(tp ->
33 Integer partition = tp.partition();
34 log.info("{} - adding partition: {}", id, partition);
35 this.partitions.add(partition);
36 StateDocument document =
38 .findById(Integer.toString(partition))
39 .orElse(new StateDocument(partition));
40 recordHandler.addPartition(partition, document.state);
41 for (String user : document.state.keySet())
44 "{} - Restored state for partition={}|user={}: {}",
48 document.state.get(user));
50 adderResults.addPartition(partition, document.results);
55 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
57 log.info("{} - Commiting offsets for all previously assigned partitions", id);
58 CountDownLatch commitDone = new CountDownLatch(1);
59 consumer.commitAsync((offsets, e) ->
61 commitDone.countDown();
64 log.error("{} - Could not commit offsets to Kafka!", id, e);
68 offsets.entrySet().stream().forEach(entry ->
70 log.info("{} - Commited offset for {}: {}", id, entry.getKey(), entry.getValue());
75 partitions.forEach(tp ->
77 Integer partition = tp.partition();
78 log.info("{} - removing partition: {}", id, partition);
79 this.partitions.remove(partition);
80 Map<String, AdderResult> state = recordHandler.removePartition(partition);
81 for (String user : state.keySet())
84 "{} - Saved state for partition={}|user={}: {}",
90 Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
91 stateRepository.save(new StateDocument(partition, state, results));
96 log.debug("{} - Waiting for async commit to complete", id);
99 catch (InterruptedException e)
102 "{} - Interrupted while waiting for async commit in onPartitionsRevoked({})",