5e1a12cce260faa457fbca66958750dce485b9c9
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
6 import org.apache.kafka.common.TopicPartition;
7
8 import java.util.*;
9
10
11 @RequiredArgsConstructor
12 @Slf4j
13 public class ApplicationRebalanceListener implements ConsumerRebalanceListener
14 {
15   private final ApplicationRecordHandler recordHandler;
16   private final AdderResults adderResults;
17   private final StateRepository stateRepository;
18   private final String id;
19
20   private final Set<Integer> partitions = new HashSet<>();
21
22   @Override
23   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
24   {
25     partitions.forEach(tp ->
26     {
27       Integer partition = tp.partition();
28       log.info("{} - adding partition: {}", id, partition);
29       this.partitions.add(partition);
30       StateDocument document =
31           stateRepository
32               .findById(Integer.toString(partition))
33               .orElse(new StateDocument(partition));
34       log.info(
35         "{} - Offset of next unseen message for partition {}: {}",
36         id,
37         partition,
38         document.offset);
39       recordHandler.addPartition(partition, document.state, document.offset);
40       for (String user : document.state.keySet())
41       {
42         log.info(
43             "{} - Restored state for partition={}|user={}: {}",
44             id,
45             partition,
46             user,
47             document.state.get(user));
48       }
49       adderResults.addPartition(partition, document.results);
50     });
51   }
52
53   @Override
54   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
55   {
56     partitions.forEach(tp ->
57     {
58       Integer partition = tp.partition();
59       log.info("{} - removing partition: {}", id, partition);
60       this.partitions.remove(partition);
61       ApplicationState state = recordHandler.removePartition(partition);
62       log.info(
63           "{} - offset of next unseen message for partition {} is {}",
64           id,
65           partition,
66           state.getOffset());
67       for (String user : state.getAdderState().keySet())
68       {
69         log.info(
70             "{} - Saved state for partition={}|user={}: {}",
71             id,
72             partition,
73             user,
74             state.getAdderState().get(user));
75       }
76       Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
77       stateRepository.save(
78         new StateDocument(
79           partition,
80           state.getAdderState(),
81           results,
82           state.getOffset()));
83     });
84   }
85 }