1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.common.TopicPartition;
10 @RequiredArgsConstructor
12 public class ApplicationRebalanceListener implements RebalanceListener
14 private final ApplicationRecordHandler recordHandler;
15 private final AdderResults adderResults;
16 private final StateRepository stateRepository;
17 private final String id;
19 private final Set<Integer> partitions = new HashSet<>();
22 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
24 partitions.forEach(tp ->
26 Integer partition = tp.partition();
27 log.info("{} - adding partition: {}", id, partition);
28 this.partitions.add(partition);
29 StateDocument document =
31 .findById(Integer.toString(partition))
32 .orElse(new StateDocument(partition));
34 "{} - Offset of next unseen message for partition {}: {}",
38 recordHandler.addPartition(partition, document.state, document.offset);
39 for (String user : document.state.keySet())
42 "{} - Restored state for partition={}|user={}: {}",
46 document.state.get(user));
48 adderResults.addPartition(partition, document.results);
53 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
55 partitions.forEach(tp ->
57 Integer partition = tp.partition();
58 log.info("{} - removing partition: {}", id, partition);
59 this.partitions.remove(partition);
60 ApplicationState state = recordHandler.removePartition(partition);
62 "{} - offset of next unseen message for partition {} is {}",
66 for (String user : state.getAdderState().keySet())
69 "{} - Saved state for partition={}|user={}: {}",
73 state.getAdderState().get(user));
75 Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
79 state.getAdderState(),
86 public void beforeNextPoll()
92 log.info("{} - persisting state & offset for partition: {}", id, partition);
93 ApplicationState state = recordHandler.getState(partition);
95 "{} - offset of next unseen message for partition {} is {}",
99 for (String user : state.getAdderState().keySet())
102 "{} - Saved state for partition={}|user={}: {}",
106 state.getAdderState().get(user));
108 Map<String, List<AdderResult>> results = adderResults.getState(partition);
109 stateRepository.save(
112 state.getAdderState(),