0bfee676e7140ad6bb4c4aad87deca36a2369436
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
6 import org.apache.kafka.common.TopicPartition;
7
8 import java.util.*;
9
10
11 @RequiredArgsConstructor
12 @Slf4j
13 public class ApplicationRebalanceListener implements ConsumerRebalanceListener
14 {
15   private final ApplicationRecordHandler recordHandler;
16   private final AdderResults adderResults;
17   private final StateRepository stateRepository;
18   private final String id;
19
20   private final Set<Integer> partitions = new HashSet<>();
21
22   @Override
23   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
24   {
25     partitions.forEach(tp ->
26     {
27       Integer partition = tp.partition();
28       log.info("{} - adding partition: {}", id, partition);
29       this.partitions.add(partition);
30       StateDocument document =
31           stateRepository
32               .findById(Integer.toString(partition))
33               .orElse(new StateDocument(partition));
34       recordHandler.addPartition(partition, document.state);
35       for (String user : document.state.keySet())
36       {
37         log.info(
38             "{} - Restored state for partition={}|user={}: {}",
39             id,
40             partition,
41             user,
42             document.state.get(user));
43       }
44       adderResults.addPartition(partition, document.results);
45     });
46   }
47
48   @Override
49   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
50   {
51     partitions.forEach(tp ->
52     {
53       Integer partition = tp.partition();
54       log.info("{} - removing partition: {}", id, partition);
55       this.partitions.remove(partition);
56       Map<String, AdderResult> state = recordHandler.removePartition(partition);
57       for (String user : state.keySet())
58       {
59         log.info(
60             "{} - Saved state for partition={}|user={}: {}",
61             id,
62             partition,
63             user,
64             state.get(user));
65       }
66       Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
67       stateRepository.save(new StateDocument(partition, state, results));
68     });
69   }
70 }