9e75112516bb9129720bcbedbcd4c045889c0b3b
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
7 import org.apache.kafka.common.TopicPartition;
8
9 import java.time.Clock;
10 import java.time.Duration;
11 import java.time.Instant;
12 import java.util.*;
13 import java.util.concurrent.CountDownLatch;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 public class ApplicationRebalanceListener implements ConsumerRebalanceListener
19 {
20   private final Consumer consumer;
21   private final ApplicationRecordHandler recordHandler;
22   private final AdderResults adderResults;
23   private final StateRepository stateRepository;
24   private final String id;
25
26   private final Set<Integer> partitions = new HashSet<>();
27
28   @Override
29   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
30   {
31     partitions.forEach(tp ->
32     {
33       Integer partition = tp.partition();
34       log.info("{} - adding partition: {}", id, partition);
35       this.partitions.add(partition);
36       StateDocument document =
37           stateRepository
38               .findById(Integer.toString(partition))
39               .orElse(new StateDocument(partition));
40       recordHandler.addPartition(partition, document.state);
41       for (String user : document.state.keySet())
42       {
43         log.info(
44             "{} - Restored state for partition={}|user={}: {}",
45             id,
46             partition,
47             user,
48             document.state.get(user));
49       }
50       adderResults.addPartition(partition, document.results);
51     });
52   }
53
54   @Override
55   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
56   {
57     log.info("{} - Commiting offsets for all previously assigned partitions", id);
58     CountDownLatch commitDone = new CountDownLatch(1);
59     consumer.commitAsync((offsets, e) ->
60     {
61       commitDone.countDown();
62       if (e == null)
63       {
64         log.error("{} - Could not commit offsets to Kafka!", id, e);
65       }
66       else
67       {
68         offsets.entrySet().stream().forEach(entry ->
69         {
70           log.info("{} - Commited offset for {}: {}", id, entry.getKey(), entry.getValue());
71         });
72       }
73     });
74
75     partitions.forEach(tp ->
76     {
77       Integer partition = tp.partition();
78       log.info("{} - removing partition: {}", id, partition);
79       this.partitions.remove(partition);
80       Map<String, AdderResult> state = recordHandler.removePartition(partition);
81       for (String user : state.keySet())
82       {
83         log.info(
84             "{} - Saved state for partition={}|user={}: {}",
85             id,
86             partition,
87             user,
88             state.get(user));
89       }
90       Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
91       stateRepository.save(new StateDocument(partition, state, results));
92     });
93
94     try
95     {
96       log.debug("{} - Waiting for async commit to complete", id);
97       commitDone.await();
98     }
99     catch (InterruptedException e)
100     {
101       log.warn(
102         "{} - Interrupted while waiting for async commit in onPartitionsRevoked({})",
103         id,
104         partitions,
105         e);
106     }
107   }
108 }