Merge branch 'endless-stream-consumer' into rebalance-listener
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.ConsumerRecords;
7 import org.apache.kafka.clients.consumer.KafkaConsumer;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11
12 import javax.annotation.PreDestroy;
13 import java.time.Duration;
14 import java.util.*;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20
21
22 @Slf4j
23 public class EndlessConsumer implements Runnable
24 {
25   private final ExecutorService executor;
26   private final String bootstrapServer;
27   private final String groupId;
28   private final String id;
29   private final String topic;
30   private final String autoOffsetReset;
31
32   private final Lock lock = new ReentrantLock();
33   private final Condition condition = lock.newCondition();
34   private boolean running = false;
35   private Exception exception;
36   private long consumed = 0;
37   private KafkaConsumer<String, String> consumer = null;
38
39
40   private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
41
42
43   public EndlessConsumer(
44       ExecutorService executor,
45       String bootstrapServer,
46       String groupId,
47       String clientId,
48       String topic,
49       String autoOffsetReset)
50   {
51     this.executor = executor;
52     this.bootstrapServer = bootstrapServer;
53     this.groupId = groupId;
54     this.id = clientId;
55     this.topic = topic;
56     this.autoOffsetReset = autoOffsetReset;
57   }
58
59   @Override
60   public void run()
61   {
62     try
63     {
64       Properties props = new Properties();
65       props.put("bootstrap.servers", bootstrapServer);
66       props.put("group.id", groupId);
67       props.put("client.id", id);
68       props.put("auto.offset.reset", autoOffsetReset);
69       props.put("metadata.max.age.ms", "1000");
70       props.put("key.deserializer", StringDeserializer.class.getName());
71       props.put("value.deserializer", StringDeserializer.class.getName());
72
73       this.consumer = new KafkaConsumer<>(props);
74
75       log.info("{} - Subscribing to topic {}", id, topic);
76       consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
77       {
78         @Override
79         public void onPartitionsRevoked(Collection<TopicPartition> partitions)
80         {
81           partitions.forEach(tp ->
82           {
83             log.info("{} - removing partition: {}", id, tp);
84             Map<String, Integer> removed = seen.remove(tp.partition());
85             for (String key : removed.keySet())
86             {
87               log.info(
88                   "{} - Seen {} messages for partition={}|key={}",
89                   id,
90                   removed.get(key),
91                   tp.partition(),
92                   key);
93             }
94           });
95         }
96
97         @Override
98         public void onPartitionsAssigned(Collection<TopicPartition> partitions)
99         {
100           partitions.forEach(tp ->
101           {
102             log.info("{} - adding partition: {}", id, tp);
103             seen.put(tp.partition(), new HashMap<>());
104           });
105         }
106       });
107
108       while (true)
109       {
110         ConsumerRecords<String, String> records =
111             consumer.poll(Duration.ofSeconds(1));
112
113         // Do something with the data...
114         log.info("{} - Received {} messages", id, records.count());
115         for (ConsumerRecord<String, String> record : records)
116         {
117           consumed++;
118           log.info(
119               "{} - {}: {}/{} - {}={}",
120               id,
121               record.offset(),
122               record.topic(),
123               record.partition(),
124               record.key(),
125               record.value()
126           );
127
128           Integer partition = record.partition();
129           String key = record.key() == null ? "NULL" : record.key();
130           Map<String, Integer> byKey = seen.get(partition);
131
132           if (!byKey.containsKey(key))
133             byKey.put(key, 0);
134
135           int seenByKey = byKey.get(key);
136           seenByKey++;
137           byKey.put(key, seenByKey);
138         }
139       }
140     }
141     catch(WakeupException e)
142     {
143       log.info("{} - RIIING!", id);
144       shutdown();
145     }
146     catch(Exception e)
147     {
148       log.error("{} - Unexpected error: {}", id, e.toString(), e);
149       shutdown(e);
150     }
151     finally
152     {
153       log.info("{} - Closing the KafkaConsumer", id);
154       consumer.close();
155       log.info("{} - Consumer-Thread exiting", id);
156     }
157   }
158
159   private void shutdown()
160   {
161     shutdown(null);
162   }
163
164   private void shutdown(Exception e)
165   {
166     lock.lock();
167     try
168     {
169       running = false;
170       exception = e;
171       condition.signal();
172     }
173     finally
174     {
175       lock.unlock();
176     }
177   }
178
179   public Map<Integer, Map<String, Integer>> getSeen()
180   {
181     return seen;
182   }
183
184   public void start()
185   {
186     lock.lock();
187     try
188     {
189       if (running)
190         throw new IllegalStateException("Consumer instance " + id + " is already running!");
191
192       log.info("{} - Starting - consumed {} messages before", id, consumed);
193       running = true;
194       exception = null;
195       executor.submit(this);
196     }
197     finally
198     {
199       lock.unlock();
200     }
201   }
202
203   public synchronized void stop() throws ExecutionException, InterruptedException
204   {
205     lock.lock();
206     try
207     {
208       if (!running)
209         throw new IllegalStateException("Consumer instance " + id + " is not running!");
210
211       log.info("{} - Stopping", id);
212       consumer.wakeup();
213       condition.await();
214       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
215     }
216     finally
217     {
218       lock.unlock();
219     }
220   }
221
222   @PreDestroy
223   public void destroy() throws ExecutionException, InterruptedException
224   {
225     log.info("{} - Destroy!", id);
226     try
227     {
228       stop();
229     }
230     catch (IllegalStateException e)
231     {
232       log.info("{} - Was already stopped", id);
233     }
234     catch (Exception e)
235     {
236       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
237     }
238     finally
239     {
240       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
241     }
242   }
243
244   public boolean running()
245   {
246     lock.lock();
247     try
248     {
249       return running;
250     }
251     finally
252     {
253       lock.unlock();
254     }
255   }
256
257   public Optional<Exception> exitStatus()
258   {
259     lock.lock();
260     try
261     {
262       if (running)
263         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
264
265       return Optional.ofNullable(exception);
266     }
267     finally
268     {
269       lock.unlock();
270     }
271   }
272 }