Merge der überarbeiteten Compose-Konfiguration ('endless-stream-consumer')
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.common.errors.WakeupException;
8 import org.apache.kafka.common.serialization.StringDeserializer;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Duration;
12 import java.util.Arrays;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Optional;
16 import java.util.Properties;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22
23
24 @Slf4j
25 public class EndlessConsumer implements Runnable
26 {
27   private final ExecutorService executor;
28   private final String bootstrapServer;
29   private final String groupId;
30   private final String id;
31   private final String topic;
32   private final String autoOffsetReset;
33
34   private final Lock lock = new ReentrantLock();
35   private final Condition condition = lock.newCondition();
36   private boolean running = false;
37   private Exception exception;
38   private long consumed = 0;
39   private KafkaConsumer<String, String> consumer = null;
40
41
42   private Map<Integer, Map<String, Integer>> seen;
43
44
45   public EndlessConsumer(
46       ExecutorService executor,
47       String bootstrapServer,
48       String groupId,
49       String clientId,
50       String topic,
51       String autoOffsetReset)
52   {
53     this.executor = executor;
54     this.bootstrapServer = bootstrapServer;
55     this.groupId = groupId;
56     this.id = clientId;
57     this.topic = topic;
58     this.autoOffsetReset = autoOffsetReset;
59   }
60
61   @Override
62   public void run()
63   {
64     try
65     {
66       Properties props = new Properties();
67       props.put("bootstrap.servers", bootstrapServer);
68       props.put("group.id", groupId);
69       props.put("client.id", id);
70       props.put("auto.offset.reset", autoOffsetReset);
71       props.put("metadata.max.age.ms", "1000");
72       props.put("key.deserializer", StringDeserializer.class.getName());
73       props.put("value.deserializer", StringDeserializer.class.getName());
74
75       this.consumer = new KafkaConsumer<>(props);
76
77       log.info("{} - Subscribing to topic {}", id, topic);
78       consumer.subscribe(Arrays.asList(topic));
79
80       seen = new HashMap<>();
81
82       while (true)
83       {
84         ConsumerRecords<String, String> records =
85             consumer.poll(Duration.ofSeconds(1));
86
87         // Do something with the data...
88         log.info("{} - Received {} messages", id, records.count());
89         for (ConsumerRecord<String, String> record : records)
90         {
91           consumed++;
92           log.info(
93               "{} - {}: {}/{} - {}={}",
94               id,
95               record.offset(),
96               record.topic(),
97               record.partition(),
98               record.key(),
99               record.value()
100           );
101
102           Integer partition = record.partition();
103           String key = record.key() == null ? "NULL" : record.key();
104
105           if (!seen.containsKey(partition))
106             seen.put(partition, new HashMap<>());
107
108           Map<String, Integer> byKey = seen.get(partition);
109
110           if (!byKey.containsKey(key))
111             byKey.put(key, 0);
112
113           int seenByKey = byKey.get(key);
114           seenByKey++;
115           byKey.put(key, seenByKey);
116         }
117       }
118     }
119     catch(WakeupException e)
120     {
121       log.info("{} - RIIING!", id);
122       shutdown();
123     }
124     catch(Exception e)
125     {
126       log.error("{} - Unexpected error: {}", id, e.toString(), e);
127       shutdown(e);
128     }
129     finally
130     {
131       log.info("{} - Closing the KafkaConsumer", id);
132       consumer.close();
133
134       for (Integer partition : seen.keySet())
135       {
136         Map<String, Integer> byKey = seen.get(partition);
137         for (String key : byKey.keySet())
138         {
139           log.info(
140               "{} - Seen {} messages for partition={}|key={}",
141               id,
142               byKey.get(key),
143               partition,
144               key);
145         }
146       }
147       seen = null;
148
149       log.info("{} - Consumer-Thread exiting", id);
150     }
151   }
152
153   public Map<Integer, Map<String, Integer>> getSeen()
154   {
155     return seen;
156   }
157
158   private void shutdown()
159   {
160     shutdown(null);
161   }
162
163   private void shutdown(Exception e)
164   {
165     lock.lock();
166     try
167     {
168       running = false;
169       exception = e;
170       condition.signal();
171     }
172     finally
173     {
174       lock.unlock();
175     }
176   }
177
178   public void start()
179   {
180     lock.lock();
181     try
182     {
183       if (running)
184         throw new IllegalStateException("Consumer instance " + id + " is already running!");
185
186       log.info("{} - Starting - consumed {} messages before", id, consumed);
187       running = true;
188       exception = null;
189       executor.submit(this);
190     }
191     finally
192     {
193       lock.unlock();
194     }
195   }
196
197   public synchronized void stop() throws ExecutionException, InterruptedException
198   {
199     lock.lock();
200     try
201     {
202       if (!running)
203         throw new IllegalStateException("Consumer instance " + id + " is not running!");
204
205       log.info("{} - Stopping", id);
206       consumer.wakeup();
207       condition.await();
208       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
209     }
210     finally
211     {
212       lock.unlock();
213     }
214   }
215
216   @PreDestroy
217   public void destroy() throws ExecutionException, InterruptedException
218   {
219     log.info("{} - Destroy!", id);
220     try
221     {
222       stop();
223     }
224     catch (IllegalStateException e)
225     {
226       log.info("{} - Was already stopped", id);
227     }
228     catch (Exception e)
229     {
230       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
231     }
232     finally
233     {
234       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
235     }
236   }
237
238   public boolean running()
239   {
240     lock.lock();
241     try
242     {
243       return running;
244     }
245     finally
246     {
247       lock.unlock();
248     }
249   }
250
251   public Optional<Exception> exitStatus()
252   {
253     lock.lock();
254     try
255     {
256       if (running)
257         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
258
259       return Optional.ofNullable(exception);
260     }
261     finally
262     {
263       lock.unlock();
264     }
265   }
266 }