1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
6 import org.apache.kafka.common.TopicPartition;
7 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
12 @RequiredArgsConstructor
14 public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListener
16 private final ApplicationRecordHandler recordHandler;
17 private final AdderResults adderResults;
18 private final StateRepository stateRepository;
19 private final String id;
21 private final Set<Integer> partitions = new HashSet<>();
24 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
26 partitions.forEach(tp ->
28 Integer partition = tp.partition();
29 log.info("{} - adding partition: {}", id, partition);
30 this.partitions.add(partition);
31 StateDocument document =
33 .findById(Integer.toString(partition))
34 .orElse(new StateDocument(partition));
35 recordHandler.addPartition(partition, document.state);
36 for (String user : document.state.keySet())
39 "{} - Restored state for partition={}|user={}: {}",
43 document.state.get(user));
45 adderResults.addPartition(partition, document.results);
50 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
52 partitions.forEach(tp ->
54 Integer partition = tp.partition();
55 log.info("{} - removing partition: {}", id, partition);
56 this.partitions.remove(partition);
57 Map<String, AdderResult> state = recordHandler.removePartition(partition);
58 for (String user : state.keySet())
61 "{} - Saved state for partition={}|user={}: {}",
67 Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
68 stateRepository.save(new StateDocument(partition, state, results));