Refaktorisierung für Tests - Record-Handler als Bean konfigurierbar
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.*;
6 import org.apache.kafka.common.TopicPartition;
7 import org.apache.kafka.common.errors.WakeupException;
8
9 import javax.annotation.PreDestroy;
10 import java.time.Duration;
11 import java.util.*;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.Lock;
16 import java.util.concurrent.locks.ReentrantLock;
17
18
19 @Slf4j
20 @RequiredArgsConstructor
21 public class EndlessConsumer implements Runnable
22 {
23   private final ExecutorService executor;
24   private final String id;
25   private final String topic;
26   private final Consumer<String, String> consumer;
27   private final java.util.function.Consumer<ConsumerRecord<String, String>> handler;
28
29   private final Lock lock = new ReentrantLock();
30   private final Condition condition = lock.newCondition();
31   private boolean running = false;
32   private Exception exception;
33   private long consumed = 0;
34
35   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
36   private final Map<Integer, Long> offsets = new HashMap<>();
37
38
39   @Override
40   public void run()
41   {
42     try
43     {
44       log.info("{} - Subscribing to topic {}", id, topic);
45       consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
46       {
47         @Override
48         public void onPartitionsRevoked(Collection<TopicPartition> partitions)
49         {
50           partitions.forEach(tp ->
51           {
52             Integer partition = tp.partition();
53             Long newOffset = consumer.position(tp);
54             Long oldOffset = offsets.remove(partition);
55             log.info(
56                 "{} - removing partition: {}, consumed {} records (offset {} -> {})",
57                 id,
58                 partition,
59                 newOffset - oldOffset,
60                 oldOffset,
61                 newOffset);
62             Map<String, Long> removed = seen.remove(partition);
63             for (String key : removed.keySet())
64             {
65               log.info(
66                   "{} - Seen {} messages for partition={}|key={}",
67                   id,
68                   removed.get(key),
69                   partition,
70                   key);
71             }
72           });
73         }
74
75         @Override
76         public void onPartitionsAssigned(Collection<TopicPartition> partitions)
77         {
78           partitions.forEach(tp ->
79           {
80             Integer partition = tp.partition();
81             Long offset = consumer.position(tp);
82             log.info("{} - adding partition: {}, offset={}", id, partition, offset);
83             offsets.put(partition, offset);
84             seen.put(partition, new HashMap<>());
85           });
86         }
87       });
88
89       while (true)
90       {
91         ConsumerRecords<String, String> records =
92             consumer.poll(Duration.ofSeconds(1));
93
94         // Do something with the data...
95         log.info("{} - Received {} messages", id, records.count());
96         for (ConsumerRecord<String, String> record : records)
97         {
98           log.info(
99               "{} - {}: {}/{} - {}={}",
100               id,
101               record.offset(),
102               record.topic(),
103               record.partition(),
104               record.key(),
105               record.value()
106           );
107
108           handler.accept(record);
109
110           consumed++;
111
112           Integer partition = record.partition();
113           String key = record.key() == null ? "NULL" : record.key();
114           Map<String, Long> byKey = seen.get(partition);
115
116           if (!byKey.containsKey(key))
117             byKey.put(key, 0l);
118
119           long seenByKey = byKey.get(key);
120           seenByKey++;
121           byKey.put(key, seenByKey);
122         }
123       }
124     }
125     catch(WakeupException e)
126     {
127       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
128       consumer.commitSync();
129       shutdown();
130     }
131     catch(Exception e)
132     {
133       log.error("{} - Unexpected error: {}", id, e.toString(), e);
134       shutdown(e);
135     }
136     finally
137     {
138       log.info("{} - Consumer-Thread exiting", id);
139     }
140   }
141
142   private void shutdown()
143   {
144     shutdown(null);
145   }
146
147   private void shutdown(Exception e)
148   {
149     lock.lock();
150     try
151     {
152       try
153       {
154         log.info("{} - Unsubscribing from topic {}", id, topic);
155         consumer.unsubscribe();
156       }
157       catch (Exception ue)
158       {
159         log.error(
160             "{} - Error while unsubscribing from topic {}: {}",
161             id,
162             topic,
163             ue.toString());
164       }
165       finally
166       {
167         running = false;
168         exception = e;
169         condition.signal();
170       }
171     }
172     finally
173     {
174       lock.unlock();
175     }
176   }
177
178   public Map<Integer, Map<String, Long>> getSeen()
179   {
180     return seen;
181   }
182
183   public void start()
184   {
185     lock.lock();
186     try
187     {
188       if (running)
189         throw new IllegalStateException("Consumer instance " + id + " is already running!");
190
191       log.info("{} - Starting - consumed {} messages before", id, consumed);
192       running = true;
193       exception = null;
194       executor.submit(this);
195     }
196     finally
197     {
198       lock.unlock();
199     }
200   }
201
202   public synchronized void stop() throws ExecutionException, InterruptedException
203   {
204     lock.lock();
205     try
206     {
207       if (!running)
208         throw new IllegalStateException("Consumer instance " + id + " is not running!");
209
210       log.info("{} - Stopping", id);
211       consumer.wakeup();
212       condition.await();
213       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
214     }
215     finally
216     {
217       lock.unlock();
218     }
219   }
220
221   @PreDestroy
222   public void destroy() throws ExecutionException, InterruptedException
223   {
224     log.info("{} - Destroy!", id);
225     try
226     {
227       stop();
228     }
229     catch (IllegalStateException e)
230     {
231       log.info("{} - Was already stopped", id);
232     }
233     catch (Exception e)
234     {
235       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
236     }
237     finally
238     {
239       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
240     }
241   }
242
243   public boolean running()
244   {
245     lock.lock();
246     try
247     {
248       return running;
249     }
250     finally
251     {
252       lock.unlock();
253     }
254   }
255
256   public Optional<Exception> exitStatus()
257   {
258     lock.lock();
259     try
260     {
261       if (running)
262         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
263
264       return Optional.ofNullable(exception);
265     }
266     finally
267     {
268       lock.unlock();
269     }
270   }
271 }