Umstellung des Nachrichten-Datentyps auf Long zurückgenommen
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.*;
6 import org.apache.kafka.common.TopicPartition;
7 import org.apache.kafka.common.errors.RecordDeserializationException;
8 import org.apache.kafka.common.errors.WakeupException;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Clock;
12 import java.time.Duration;
13 import java.time.Instant;
14 import java.util.*;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20
21
22 @Slf4j
23 @RequiredArgsConstructor
24 public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
25 {
26   private final ExecutorService executor;
27   private final PartitionStatisticsRepository repository;
28   private final String id;
29   private final String topic;
30   private final Clock clock;
31   private final Duration commitInterval;
32   private final Consumer<K, V> consumer;
33   private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
34
35   private final Lock lock = new ReentrantLock();
36   private final Condition condition = lock.newCondition();
37   private boolean running = false;
38   private Exception exception;
39   private long consumed = 0;
40
41   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
42
43
44   @Override
45   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
46   {
47     partitions.forEach(tp ->
48     {
49       Integer partition = tp.partition();
50       Long newOffset = consumer.position(tp);
51       log.info(
52           "{} - removing partition: {}, offset of next message {})",
53           id,
54           partition,
55           newOffset);
56       Map<String, Long> removed = seen.remove(partition);
57       for (String key : removed.keySet())
58       {
59         log.info(
60             "{} - Seen {} messages for partition={}|key={}",
61             id,
62             removed.get(key),
63             partition,
64             key);
65       }
66       repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
67     });
68   }
69
70   @Override
71   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
72   {
73     partitions.forEach(tp ->
74     {
75       Integer partition = tp.partition();
76       Long offset = consumer.position(tp);
77       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
78       StatisticsDocument document =
79           repository
80               .findById(Integer.toString(partition))
81               .orElse(new StatisticsDocument(partition));
82       if (document.offset >= 0)
83       {
84         // Only seek, if a stored offset was found
85         // Otherwise: Use initial offset, generated by Kafka
86         consumer.seek(tp, document.offset);
87       }
88       seen.put(partition, document.statistics);
89     });
90   }
91
92
93   @Override
94   public void run()
95   {
96     try
97     {
98       log.info("{} - Subscribing to topic {}", id, topic);
99       consumer.subscribe(Arrays.asList(topic), this);
100
101       Instant lastCommit = clock.instant();
102
103       while (true)
104       {
105         ConsumerRecords<K, V> records =
106             consumer.poll(Duration.ofSeconds(1));
107
108         // Do something with the data...
109         log.info("{} - Received {} messages", id, records.count());
110         for (ConsumerRecord<K, V> record : records)
111         {
112           log.info(
113               "{} - {}: {}/{} - {}={}",
114               id,
115               record.offset(),
116               record.topic(),
117               record.partition(),
118               record.key(),
119               record.value()
120           );
121
122           handler.accept(record);
123
124           consumed++;
125
126           Integer partition = record.partition();
127           String key = record.key() == null ? "NULL" : record.key().toString();
128           Map<String, Long> byKey = seen.get(partition);
129
130           if (!byKey.containsKey(key))
131             byKey.put(key, 0l);
132
133           long seenByKey = byKey.get(key);
134           seenByKey++;
135           byKey.put(key, seenByKey);
136         }
137
138         if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
139         {
140           log.debug("Storing data and offsets, last commit: {}", lastCommit);
141           seen.forEach((partiton, statistics) -> repository.save(
142               new StatisticsDocument(
143                   partiton,
144                   statistics,
145                   consumer.position(new TopicPartition(topic, partiton)))));
146           lastCommit = clock.instant();
147         }
148       }
149     }
150     catch(WakeupException e)
151     {
152       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
153       shutdown();
154     }
155     catch(RecordDeserializationException e)
156     {
157       TopicPartition tp = e.topicPartition();
158       long offset = e.offset();
159       log.error(
160           "{} - Could not deserialize  message on topic {} with offset={}: {}",
161           id,
162           tp,
163           offset,
164           e.getCause().toString());
165
166       shutdown(e);
167     }
168     catch(Exception e)
169     {
170       log.error("{} - Unexpected error: {}", id, e.toString(), e);
171       shutdown(e);
172     }
173     finally
174     {
175       log.info("{} - Consumer-Thread exiting", id);
176     }
177   }
178
179   private void shutdown()
180   {
181     shutdown(null);
182   }
183
184   private void shutdown(Exception e)
185   {
186     lock.lock();
187     try
188     {
189       try
190       {
191         log.info("{} - Unsubscribing from topic {}", id, topic);
192         consumer.unsubscribe();
193       }
194       catch (Exception ue)
195       {
196         log.error(
197             "{} - Error while unsubscribing from topic {}: {}",
198             id,
199             topic,
200             ue.toString());
201       }
202       finally
203       {
204         running = false;
205         exception = e;
206         condition.signal();
207       }
208     }
209     finally
210     {
211       lock.unlock();
212     }
213   }
214
215   public Map<Integer, Map<String, Long>> getSeen()
216   {
217     return seen;
218   }
219
220   public void start()
221   {
222     lock.lock();
223     try
224     {
225       if (running)
226         throw new IllegalStateException("Consumer instance " + id + " is already running!");
227
228       log.info("{} - Starting - consumed {} messages before", id, consumed);
229       running = true;
230       exception = null;
231       executor.submit(this);
232     }
233     finally
234     {
235       lock.unlock();
236     }
237   }
238
239   public synchronized void stop() throws InterruptedException
240   {
241     lock.lock();
242     try
243     {
244       if (!running)
245         throw new IllegalStateException("Consumer instance " + id + " is not running!");
246
247       log.info("{} - Stopping", id);
248       consumer.wakeup();
249       condition.await();
250       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
251     }
252     finally
253     {
254       lock.unlock();
255     }
256   }
257
258   @PreDestroy
259   public void destroy() throws ExecutionException, InterruptedException
260   {
261     log.info("{} - Destroy!", id);
262     log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
263   }
264
265   public boolean running()
266   {
267     lock.lock();
268     try
269     {
270       return running;
271     }
272     finally
273     {
274       lock.unlock();
275     }
276   }
277
278   public Optional<Exception> exitStatus()
279   {
280     lock.lock();
281     try
282     {
283       if (running)
284         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
285
286       return Optional.ofNullable(exception);
287     }
288     finally
289     {
290       lock.unlock();
291     }
292   }
293 }