e214a14b763fb070b92d5da41d5b1ca6d3f3a1f0
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
6 import org.apache.kafka.common.TopicPartition;
7
8 import java.util.*;
9
10
11 @RequiredArgsConstructor
12 @Slf4j
13 public class ApplicationRebalanceListener implements ConsumerRebalanceListener
14 {
15   private final ApplicationRecordHandler recordHandler;
16   private final AdderResults adderResults;
17   private final StateRepository stateRepository;
18   private final String id;
19
20   private final Set<Integer> partitions = new HashSet<>();
21
22   @Override
23   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
24   {
25     partitions.forEach(tp ->
26     {
27       Integer partition = tp.partition();
28       log.info("{} - adding partition: {}", id, partition);
29     });
30   }
31
32   @Override
33   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
34   {
35     partitions.forEach(tp ->
36     {
37       Integer partition = tp.partition();
38       log.info("{} - removing partition: {}", id, partition);
39       Map<String, AdderResult> state = recordHandler.getState(partition).getState();
40       for (String user : state.keySet())
41       {
42         log.info(
43             "{} - Saved state for partition={}|user={}: {}",
44             id,
45             partition,
46             user,
47             state.get(user));
48       }
49     });
50   }
51 }