ROT: Rückbau auf automatische Commits - Testfälle laufen nicht mehr
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.common.TopicPartition;
6
7 import java.time.Clock;
8 import java.time.Duration;
9 import java.time.Instant;
10 import java.util.Collection;
11 import java.util.Map;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
17 {
18   private final ApplicationRecordHandler recordHandler;
19   private final StateRepository stateRepository;
20   private final String id;
21   private final Clock clock;
22   private final Duration commitInterval;
23
24   private Instant lastCommit = Instant.EPOCH;
25
26   @Override
27   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
28   {
29     partitions.forEach(tp ->
30     {
31       Integer partition = tp.partition();
32       log.info("{} - adding partition: {}", id, partition);
33       StateDocument document =
34           stateRepository
35               .findById(Integer.toString(partition))
36               .orElse(new StateDocument(partition));
37       recordHandler.addPartition(partition, document.state);
38     });
39   }
40
41   @Override
42   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
43   {
44     partitions.forEach(tp ->
45     {
46       Integer partition = tp.partition();
47       log.info("{} - removing partition: {}", id, partition);
48       Map<String, Long> removed = recordHandler.removePartition(partition);
49       for (String key : removed.keySet())
50       {
51         log.info(
52             "{} - Seen {} messages for partition={}|key={}",
53             id,
54             removed.get(key),
55             partition,
56             key);
57       }
58       stateRepository.save(new StateDocument(partition, removed));
59     });
60   }
61
62
63   @Override
64   public void beforeNextPoll()
65   {
66     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
67     {
68       log.debug("Storing data, last commit: {}", lastCommit);
69       recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
70           new StateDocument(
71               partiton,
72               adder.getState())));
73       lastCommit = clock.instant();
74     }
75   }
76 }