Merge der Refaktorisierung des EndlessConsumer (Branch 'stored-state')
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.*;
6 import org.apache.kafka.common.TopicPartition;
7 import org.apache.kafka.common.errors.RecordDeserializationException;
8 import org.apache.kafka.common.errors.WakeupException;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Duration;
12 import java.util.*;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
18
19
20 @Slf4j
21 @RequiredArgsConstructor
22 public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
23 {
24   private final ExecutorService executor;
25   private final PartitionStatisticsRepository repository;
26   private final String id;
27   private final String topic;
28   private final Consumer<K, V> consumer;
29   private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
30
31   private final Lock lock = new ReentrantLock();
32   private final Condition condition = lock.newCondition();
33   private boolean running = false;
34   private Exception exception;
35   private long consumed = 0;
36
37   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
38   private final Map<Integer, Long> offsets = new HashMap<>();
39
40
41   @Override
42   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
43   {
44     partitions.forEach(tp ->
45     {
46       Integer partition = tp.partition();
47       Long newOffset = consumer.position(tp);
48       Long oldOffset = offsets.remove(partition);
49       log.info(
50           "{} - removing partition: {}, consumed {} records (offset {} -> {})",
51           id,
52           partition,
53           newOffset - oldOffset,
54           oldOffset,
55           newOffset);
56       Map<String, Long> removed = seen.remove(partition);
57       for (String key : removed.keySet())
58       {
59         log.info(
60             "{} - Seen {} messages for partition={}|key={}",
61             id,
62             removed.get(key),
63             partition,
64             key);
65       }
66       repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
67     });
68   }
69
70   @Override
71   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
72   {
73     partitions.forEach(tp ->
74     {
75       Integer partition = tp.partition();
76       Long offset = consumer.position(tp);
77       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
78       StatisticsDocument document =
79           repository
80               .findById(Integer.toString(partition))
81               .orElse(new StatisticsDocument(partition));
82       consumer.seek(tp, document.offset);
83       seen.put(partition, document.statistics);
84     });
85   }
86
87
88   @Override
89   public void run()
90   {
91     try
92     {
93       log.info("{} - Subscribing to topic {}", id, topic);
94       consumer.subscribe(Arrays.asList(topic), this);
95
96       while (true)
97       {
98         ConsumerRecords<K, V> records =
99             consumer.poll(Duration.ofSeconds(1));
100
101         // Do something with the data...
102         log.info("{} - Received {} messages", id, records.count());
103         for (ConsumerRecord<K, V> record : records)
104         {
105           log.info(
106               "{} - {}: {}/{} - {}={}",
107               id,
108               record.offset(),
109               record.topic(),
110               record.partition(),
111               record.key(),
112               record.value()
113           );
114
115           handler.accept(record);
116
117           consumed++;
118
119           Integer partition = record.partition();
120           String key = record.key() == null ? "NULL" : record.key().toString();
121           Map<String, Long> byKey = seen.get(partition);
122
123           if (!byKey.containsKey(key))
124             byKey.put(key, 0l);
125
126           long seenByKey = byKey.get(key);
127           seenByKey++;
128           byKey.put(key, seenByKey);
129         }
130
131         seen.forEach((partiton, statistics) -> repository.save(
132             new StatisticsDocument(
133                 partiton,
134                 statistics,
135                 consumer.position(new TopicPartition(topic, partiton)))));
136       }
137     }
138     catch(WakeupException e)
139     {
140       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
141       shutdown();
142     }
143     catch(RecordDeserializationException e)
144     {
145       TopicPartition tp = e.topicPartition();
146       long offset = e.offset();
147       log.error(
148           "{} - Could not deserialize  message on topic {} with offset={}: {}",
149           id,
150           tp,
151           offset,
152           e.getCause().toString());
153
154       shutdown(e);
155     }
156     catch(Exception e)
157     {
158       log.error("{} - Unexpected error: {}", id, e.toString(), e);
159       shutdown(e);
160     }
161     finally
162     {
163       log.info("{} - Consumer-Thread exiting", id);
164     }
165   }
166
167   private void shutdown()
168   {
169     shutdown(null);
170   }
171
172   private void shutdown(Exception e)
173   {
174     lock.lock();
175     try
176     {
177       try
178       {
179         log.info("{} - Unsubscribing from topic {}", id, topic);
180         consumer.unsubscribe();
181       }
182       catch (Exception ue)
183       {
184         log.error(
185             "{} - Error while unsubscribing from topic {}: {}",
186             id,
187             topic,
188             ue.toString());
189       }
190       finally
191       {
192         running = false;
193         exception = e;
194         condition.signal();
195       }
196     }
197     finally
198     {
199       lock.unlock();
200     }
201   }
202
203   public Map<Integer, Map<String, Long>> getSeen()
204   {
205     return seen;
206   }
207
208   public void start()
209   {
210     lock.lock();
211     try
212     {
213       if (running)
214         throw new IllegalStateException("Consumer instance " + id + " is already running!");
215
216       log.info("{} - Starting - consumed {} messages before", id, consumed);
217       running = true;
218       exception = null;
219       executor.submit(this);
220     }
221     finally
222     {
223       lock.unlock();
224     }
225   }
226
227   public synchronized void stop() throws ExecutionException, InterruptedException
228   {
229     lock.lock();
230     try
231     {
232       if (!running)
233         throw new IllegalStateException("Consumer instance " + id + " is not running!");
234
235       log.info("{} - Stopping", id);
236       consumer.wakeup();
237       condition.await();
238       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
239     }
240     finally
241     {
242       lock.unlock();
243     }
244   }
245
246   @PreDestroy
247   public void destroy() throws ExecutionException, InterruptedException
248   {
249     log.info("{} - Destroy!", id);
250     try
251     {
252       stop();
253     }
254     catch (IllegalStateException e)
255     {
256       log.info("{} - Was already stopped", id);
257     }
258     catch (Exception e)
259     {
260       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
261     }
262     finally
263     {
264       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
265     }
266   }
267
268   public boolean running()
269   {
270     lock.lock();
271     try
272     {
273       return running;
274     }
275     finally
276     {
277       lock.unlock();
278     }
279   }
280
281   public Optional<Exception> exitStatus()
282   {
283     lock.lock();
284     try
285     {
286       if (running)
287         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
288
289       return Optional.ofNullable(exception);
290     }
291     finally
292     {
293       lock.unlock();
294     }
295   }
296 }