6d0c69d878d3a63805c68877bfecb410ac5697e3
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.common.TopicPartition;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.beans.factory.annotation.Value;
9 import org.springframework.kafka.annotation.KafkaListener;
10 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
11 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
12 import org.springframework.stereotype.Component;
13
14 import java.util.Collection;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.function.Consumer;
19
20
21 @Component
22 @Slf4j
23 @RequiredArgsConstructor
24 public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
25 {
26   @Autowired
27   private KafkaListenerEndpointRegistry registry;
28   @Value("${spring.kafka.consumer.client-id}")
29   String id;
30   @Autowired
31   Consumer<ConsumerRecord<K, V>> handler;
32   @Autowired
33   ApplicationErrorHandler errorHandler;
34
35   private long consumed = 0;
36
37   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
38   private final Map<Integer, Long> offsets = new HashMap<>();
39
40
41   @Override
42   public void onPartitionsRevokedBeforeCommit(
43       org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
44       Collection<TopicPartition> partitions)
45   {
46     partitions.forEach(tp ->
47     {
48       Integer partition = tp.partition();
49       Long newOffset = consumer.position(tp);
50       Long oldOffset = offsets.remove(partition);
51       log.info(
52           "{} - removing partition: {}, consumed {} records (offset {} -> {})",
53           id,
54           partition,
55           newOffset - oldOffset,
56           oldOffset,
57           newOffset);
58       Map<String, Long> removed = seen.remove(partition);
59       for (String key : removed.keySet())
60       {
61         log.info(
62             "{} - Seen {} messages for partition={}|key={}",
63             id,
64             removed.get(key),
65             partition,
66             key);
67       }
68     });
69   }
70
71   @Override
72   public void onPartitionsAssigned(
73       org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
74       Collection<TopicPartition> partitions)
75   {
76     partitions.forEach(tp ->
77     {
78       Integer partition = tp.partition();
79       Long offset = consumer.position(tp);
80       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
81       offsets.put(partition, offset);
82       seen.put(partition, new HashMap<>());
83     });
84   }
85
86
87   @KafkaListener(
88       id = "${spring.kafka.consumer.client-id}",
89       idIsGroup = false,
90       topics = "${consumer.topic}",
91       autoStartup = "false")
92   public void receive(ConsumerRecord<K, V> record)
93   {
94     log.info(
95         "{} - {}: {}/{} - {}={}",
96         id,
97         record.offset(),
98         record.topic(),
99         record.partition(),
100         record.key(),
101         record.value()
102     );
103
104     handler.accept(record);
105
106     consumed++;
107   }
108
109
110   public synchronized void start()
111   {
112     if (registry.getListenerContainer(id).isChildRunning())
113       throw new IllegalStateException("Consumer instance " + id + " is already running!");
114
115     log.info("{} - Starting - consumed {} messages before", id, consumed);
116     errorHandler.clearException();
117     registry.getListenerContainer(id).start();
118   }
119
120   public synchronized void stop()
121   {
122     if (!registry.getListenerContainer(id).isChildRunning())
123       throw new IllegalStateException("Consumer instance " + id + " is not running!");
124
125     log.info("{} - Stopping", id);
126     registry.getListenerContainer(id).stop();
127     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
128   }
129
130   public synchronized Optional<Exception> exitStatus()
131   {
132     if (registry.getListenerContainer(id).isChildRunning())
133       throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
134
135     return errorHandler.getException();
136   }
137 }