Der Test verwendet die `@Bean` von `EndlessConsumer`
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.common.TopicPartition;
10 import org.apache.kafka.common.errors.RecordDeserializationException;
11 import org.apache.kafka.common.errors.WakeupException;
12
13 import javax.annotation.PreDestroy;
14 import java.time.Duration;
15 import java.util.*;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.locks.Condition;
19 import java.util.concurrent.locks.Lock;
20 import java.util.concurrent.locks.ReentrantLock;
21
22
23 @Slf4j
24 @RequiredArgsConstructor
25 public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
26 {
27   private final ExecutorService executor;
28   private final String id;
29   private final String topic;
30   private final Consumer<K, V> consumer;
31   private final RecordHandler handler;
32
33   private final Lock lock = new ReentrantLock();
34   private final Condition condition = lock.newCondition();
35   private boolean running = false;
36   private Exception exception;
37   private long consumed = 0;
38
39   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
40   private final Map<Integer, Long> offsets = new HashMap<>();
41
42
43   @Override
44   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
45   {
46     partitions.forEach(tp ->
47     {
48       Integer partition = tp.partition();
49       Long newOffset = consumer.position(tp);
50       Long oldOffset = offsets.remove(partition);
51       log.info(
52           "{} - removing partition: {}, consumed {} records (offset {} -> {})",
53           id,
54           partition,
55           newOffset - oldOffset,
56           oldOffset,
57           newOffset);
58       Map<String, Long> removed = seen.remove(partition);
59       for (String key : removed.keySet())
60       {
61         log.info(
62             "{} - Seen {} messages for partition={}|key={}",
63             id,
64             removed.get(key),
65             partition,
66             key);
67       }
68     });
69   }
70
71   @Override
72   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
73   {
74     partitions.forEach(tp ->
75     {
76       Integer partition = tp.partition();
77       Long offset = consumer.position(tp);
78       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
79       offsets.put(partition, offset);
80       seen.put(partition, new HashMap<>());
81     });
82   }
83
84
85   @Override
86   public void run()
87   {
88     try
89     {
90       log.info("{} - Subscribing to topic {}", id, topic);
91       consumer.subscribe(Arrays.asList(topic), this);
92
93       while (true)
94       {
95         ConsumerRecords<K, V> records =
96             consumer.poll(Duration.ofSeconds(1));
97
98         // Do something with the data...
99         log.info("{} - Received {} messages", id, records.count());
100         for (ConsumerRecord<K, V> record : records)
101         {
102           log.info(
103               "{} - {}: {}/{} - {}={}",
104               id,
105               record.offset(),
106               record.topic(),
107               record.partition(),
108               record.key(),
109               record.value()
110           );
111
112           handler.accept(record);
113
114           consumed++;
115
116           Integer partition = record.partition();
117           String key = record.key() == null ? "NULL" : record.key().toString();
118           Map<String, Long> byKey = seen.get(partition);
119
120           if (!byKey.containsKey(key))
121             byKey.put(key, 0l);
122
123           long seenByKey = byKey.get(key);
124           seenByKey++;
125           byKey.put(key, seenByKey);
126         }
127       }
128     }
129     catch(WakeupException e)
130     {
131       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
132       consumer.commitSync();
133       shutdown();
134     }
135     catch(RecordDeserializationException e)
136     {
137       TopicPartition tp = e.topicPartition();
138       long offset = e.offset();
139       log.error(
140           "{} - Could not deserialize  message on topic {} with offset={}: {}",
141           id,
142           tp,
143           offset,
144           e.getCause().toString());
145
146       consumer.commitSync();
147       shutdown(e);
148     }
149     catch(Exception e)
150     {
151       log.error("{} - Unexpected error: {}", id, e.toString(), e);
152       shutdown(e);
153     }
154     finally
155     {
156       log.info("{} - Consumer-Thread exiting", id);
157     }
158   }
159
160   private void shutdown()
161   {
162     shutdown(null);
163   }
164
165   private void shutdown(Exception e)
166   {
167     lock.lock();
168     try
169     {
170       try
171       {
172         log.info("{} - Unsubscribing from topic {}", id, topic);
173         consumer.unsubscribe();
174       }
175       catch (Exception ue)
176       {
177         log.error(
178             "{} - Error while unsubscribing from topic {}: {}",
179             id,
180             topic,
181             ue.toString());
182       }
183       finally
184       {
185         running = false;
186         exception = e;
187         condition.signal();
188       }
189     }
190     finally
191     {
192       lock.unlock();
193     }
194   }
195
196   public Map<Integer, Map<String, Long>> getSeen()
197   {
198     return seen;
199   }
200
201   public void start()
202   {
203     lock.lock();
204     try
205     {
206       if (running)
207         throw new IllegalStateException("Consumer instance " + id + " is already running!");
208
209       log.info("{} - Starting - consumed {} messages before", id, consumed);
210       running = true;
211       exception = null;
212       executor.submit(this);
213     }
214     finally
215     {
216       lock.unlock();
217     }
218   }
219
220   public synchronized void stop() throws ExecutionException, InterruptedException
221   {
222     lock.lock();
223     try
224     {
225       if (!running)
226         throw new IllegalStateException("Consumer instance " + id + " is not running!");
227
228       log.info("{} - Stopping", id);
229       consumer.wakeup();
230       condition.await();
231       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
232     }
233     finally
234     {
235       lock.unlock();
236     }
237   }
238
239   @PreDestroy
240   public void destroy() throws ExecutionException, InterruptedException
241   {
242     log.info("{} - Destroy!", id);
243     try
244     {
245       stop();
246     }
247     catch (IllegalStateException e)
248     {
249       log.info("{} - Was already stopped", id);
250     }
251     catch (Exception e)
252     {
253       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
254     }
255     finally
256     {
257       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
258     }
259   }
260
261   public boolean running()
262   {
263     lock.lock();
264     try
265     {
266       return running;
267     }
268     finally
269     {
270       lock.unlock();
271     }
272   }
273
274   public Optional<Exception> exitStatus()
275   {
276     lock.lock();
277     try
278     {
279       if (running)
280         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
281
282       return Optional.ofNullable(exception);
283     }
284     finally
285     {
286       lock.unlock();
287     }
288   }
289 }