Die Implementierung speichert Zustand & Offsets vor _jedem_ `poll()`
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.common.TopicPartition;
6
7 import java.util.*;
8
9
10 @RequiredArgsConstructor
11 @Slf4j
12 public class ApplicationRebalanceListener implements RebalanceListener
13 {
14   private final ApplicationRecordHandler recordHandler;
15   private final AdderResults adderResults;
16   private final StateRepository stateRepository;
17   private final String id;
18
19   private final Set<Integer> partitions = new HashSet<>();
20
21   @Override
22   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
23   {
24     partitions.forEach(tp ->
25     {
26       Integer partition = tp.partition();
27       log.info("{} - adding partition: {}", id, partition);
28       this.partitions.add(partition);
29       StateDocument document =
30           stateRepository
31               .findById(Integer.toString(partition))
32               .orElse(new StateDocument(partition));
33       log.info(
34         "{} - Offset of next unseen message for partition {}: {}",
35         id,
36         partition,
37         document.offset);
38       recordHandler.addPartition(partition, document.state, document.offset);
39       for (String user : document.state.keySet())
40       {
41         log.info(
42             "{} - Restored state for partition={}|user={}: {}",
43             id,
44             partition,
45             user,
46             document.state.get(user));
47       }
48       adderResults.addPartition(partition, document.results);
49     });
50   }
51
52   @Override
53   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
54   {
55     partitions.forEach(tp ->
56     {
57       Integer partition = tp.partition();
58       log.info("{} - removing partition: {}", id, partition);
59       this.partitions.remove(partition);
60       ApplicationState state = recordHandler.removePartition(partition);
61       log.info(
62           "{} - offset of next unseen message for partition {} is {}",
63           id,
64           partition,
65           state.getOffset());
66       for (String user : state.getAdderState().keySet())
67       {
68         log.info(
69             "{} - Saved state for partition={}|user={}: {}",
70             id,
71             partition,
72             user,
73             state.getAdderState().get(user));
74       }
75       Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
76       stateRepository.save(
77         new StateDocument(
78           partition,
79           state.getAdderState(),
80           results,
81           state.getOffset()));
82     });
83   }
84
85   @Override
86   public void beforeNextPoll()
87   {
88     partitions
89       .stream()
90       .forEach(partition ->
91       {
92         log.info("{} - persisting state & offset for partition: {}", id, partition);
93         ApplicationState state = recordHandler.getState(partition);
94         log.info(
95           "{} - offset of next unseen message for partition {} is {}",
96           id,
97           partition,
98           state.getOffset());
99         for (String user : state.getAdderState().keySet())
100         {
101           log.info(
102             "{} - Saved state for partition={}|user={}: {}",
103             id,
104             partition,
105             user,
106             state.getAdderState().get(user));
107         }
108         Map<String, List<AdderResult>> results = adderResults.getState(partition);
109         stateRepository.save(
110           new StateDocument(
111             partition,
112             state.getAdderState(),
113             results,
114             state.getOffset()));
115       });
116   }
117 }