Merge branch 'endless-stream-consumer' into rebalance-listener
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.ConsumerRecords;
7 import org.apache.kafka.clients.consumer.KafkaConsumer;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11
12 import javax.annotation.PreDestroy;
13 import java.time.Duration;
14 import java.util.*;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20
21
22 @Slf4j
23 public class EndlessConsumer implements Runnable
24 {
25   private final ExecutorService executor;
26   private final String bootstrapServer;
27   private final String groupId;
28   private final String id;
29   private final String topic;
30   private final String autoOffsetReset;
31
32   private final Lock lock = new ReentrantLock();
33   private final Condition condition = lock.newCondition();
34   private boolean running = false;
35   private long consumed = 0;
36   private KafkaConsumer<String, String> consumer = null;
37
38
39   private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
40
41
42   public EndlessConsumer(
43       ExecutorService executor,
44       String bootstrapServer,
45       String groupId,
46       String clientId,
47       String topic,
48       String autoOffsetReset)
49   {
50     this.executor = executor;
51     this.bootstrapServer = bootstrapServer;
52     this.groupId = groupId;
53     this.id = clientId;
54     this.topic = topic;
55     this.autoOffsetReset = autoOffsetReset;
56   }
57
58   @Override
59   public void run()
60   {
61     try
62     {
63       Properties props = new Properties();
64       props.put("bootstrap.servers", bootstrapServer);
65       props.put("group.id", groupId);
66       props.put("client.id", id);
67       props.put("auto.offset.reset", autoOffsetReset);
68       props.put("metadata.max.age.ms", "1000");
69       props.put("key.deserializer", StringDeserializer.class.getName());
70       props.put("value.deserializer", StringDeserializer.class.getName());
71
72       this.consumer = new KafkaConsumer<>(props);
73
74       log.info("{} - Subscribing to topic {}", id, topic);
75       consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
76       {
77         @Override
78         public void onPartitionsRevoked(Collection<TopicPartition> partitions)
79         {
80           partitions.forEach(tp ->
81           {
82             log.info("{} - removing partition: {}", id, tp);
83             Map<String, Integer> removed = seen.remove(tp.partition());
84             for (String key : removed.keySet())
85             {
86               log.info(
87                   "{} - Seen {} messages for partition={}|key={}",
88                   id,
89                   removed.get(key),
90                   tp.partition(),
91                   key);
92             }
93           });
94         }
95
96         @Override
97         public void onPartitionsAssigned(Collection<TopicPartition> partitions)
98         {
99           partitions.forEach(tp ->
100           {
101             log.info("{} - adding partition: {}", id, tp);
102             seen.put(tp.partition(), new HashMap<>());
103           });
104         }
105       });
106
107       while (true)
108       {
109         ConsumerRecords<String, String> records =
110             consumer.poll(Duration.ofSeconds(1));
111
112         // Do something with the data...
113         log.info("{} - Received {} messages", id, records.count());
114         for (ConsumerRecord<String, String> record : records)
115         {
116           consumed++;
117           log.info(
118               "{} - {}: {}/{} - {}={}",
119               id,
120               record.offset(),
121               record.topic(),
122               record.partition(),
123               record.key(),
124               record.value()
125           );
126
127           Integer partition = record.partition();
128           String key = record.key() == null ? "NULL" : record.key();
129           Map<String, Integer> byKey = seen.get(partition);
130
131           if (!byKey.containsKey(key))
132             byKey.put(key, 0);
133
134           int seenByKey = byKey.get(key);
135           seenByKey++;
136           byKey.put(key, seenByKey);
137         }
138       }
139     }
140     catch(WakeupException e)
141     {
142       log.info("{} - RIIING!", id);
143       shutdown();
144     }
145     catch(Exception e)
146     {
147       log.error("{} - Unexpected error: {}", id, e.toString(), e);
148       shutdown();
149     }
150     finally
151     {
152       log.info("{} - Closing the KafkaConsumer", id);
153       consumer.close();
154       log.info("{} - Consumer-Thread exiting", id);
155     }
156   }
157
158   private void shutdown()
159   {
160     lock.lock();
161     try
162     {
163       running = false;
164       condition.signal();
165     }
166     finally
167     {
168       lock.unlock();
169     }
170   }
171
172   public Map<Integer, Map<String, Integer>> getSeen()
173   {
174     return seen;
175   }
176
177   public void start()
178   {
179     lock.lock();
180     try
181     {
182       if (running)
183         throw new IllegalStateException("Consumer instance " + id + " is already running!");
184
185       log.info("{} - Starting - consumed {} messages before", id, consumed);
186       running = true;
187       executor.submit(this);
188     }
189     finally
190     {
191       lock.unlock();
192     }
193   }
194
195   public synchronized void stop() throws ExecutionException, InterruptedException
196   {
197     lock.lock();
198     try
199     {
200       if (!running)
201         throw new IllegalStateException("Consumer instance " + id + " is not running!");
202
203       log.info("{} - Stopping", id);
204       consumer.wakeup();
205       condition.await();
206       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
207     }
208     finally
209     {
210       lock.unlock();
211     }
212   }
213
214   @PreDestroy
215   public void destroy() throws ExecutionException, InterruptedException
216   {
217     log.info("{} - Destroy!", id);
218     try
219     {
220       stop();
221     }
222     catch (IllegalStateException e)
223     {
224       log.info("{} - Was already stopped", id);
225     }
226     catch (Exception e)
227     {
228       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
229     }
230     finally
231     {
232       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
233     }
234   }
235 }