Die Offsets werden ausgegeben, wenn eine Partition zugeteilt/entzogen wird
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.ConsumerRecords;
7 import org.apache.kafka.clients.consumer.KafkaConsumer;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11
12 import javax.annotation.PreDestroy;
13 import java.time.Duration;
14 import java.util.*;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20
21
22 @Slf4j
23 public class EndlessConsumer implements Runnable
24 {
25   private final ExecutorService executor;
26   private final String bootstrapServer;
27   private final String groupId;
28   private final String id;
29   private final String topic;
30   private final String autoOffsetReset;
31
32   private final Lock lock = new ReentrantLock();
33   private final Condition condition = lock.newCondition();
34   private boolean running = false;
35   private Exception exception;
36   private long consumed = 0;
37   private KafkaConsumer<String, String> consumer = null;
38
39
40   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
41   private final Map<Integer, Long> offsets = new HashMap<>();
42
43
44   public EndlessConsumer(
45       ExecutorService executor,
46       String bootstrapServer,
47       String groupId,
48       String clientId,
49       String topic,
50       String autoOffsetReset)
51   {
52     this.executor = executor;
53     this.bootstrapServer = bootstrapServer;
54     this.groupId = groupId;
55     this.id = clientId;
56     this.topic = topic;
57     this.autoOffsetReset = autoOffsetReset;
58   }
59
60   @Override
61   public void run()
62   {
63     try
64     {
65       Properties props = new Properties();
66       props.put("bootstrap.servers", bootstrapServer);
67       props.put("group.id", groupId);
68       props.put("client.id", id);
69       props.put("auto.offset.reset", autoOffsetReset);
70       props.put("metadata.max.age.ms", "1000");
71       props.put("key.deserializer", StringDeserializer.class.getName());
72       props.put("value.deserializer", StringDeserializer.class.getName());
73
74       this.consumer = new KafkaConsumer<>(props);
75
76       log.info("{} - Subscribing to topic {}", id, topic);
77       consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
78       {
79         @Override
80         public void onPartitionsRevoked(Collection<TopicPartition> partitions)
81         {
82           partitions.forEach(tp ->
83           {
84             Integer partition = tp.partition();
85             Long newOffset = consumer.position(tp);
86             Long oldOffset = offsets.remove(partition);
87             log.info(
88                 "{} - removing partition: {}, consumed {} records (offset {} -> {})",
89                 id,
90                 partition,
91                 newOffset - oldOffset,
92                 oldOffset,
93                 newOffset);
94             Map<String, Long> removed = seen.remove(partition);
95             for (String key : removed.keySet())
96             {
97               log.info(
98                   "{} - Seen {} messages for partition={}|key={}",
99                   id,
100                   removed.get(key),
101                   partition,
102                   key);
103             }
104           });
105         }
106
107         @Override
108         public void onPartitionsAssigned(Collection<TopicPartition> partitions)
109         {
110           partitions.forEach(tp ->
111           {
112             Integer partition = tp.partition();
113             Long offset = consumer.position(tp);
114             log.info("{} - adding partition: {}, offset={}", id, partition, offset);
115             offsets.put(partition, offset);
116             seen.put(partition, new HashMap<>());
117           });
118         }
119       });
120
121       while (true)
122       {
123         ConsumerRecords<String, String> records =
124             consumer.poll(Duration.ofSeconds(1));
125
126         // Do something with the data...
127         log.info("{} - Received {} messages", id, records.count());
128         for (ConsumerRecord<String, String> record : records)
129         {
130           consumed++;
131           log.info(
132               "{} - {}: {}/{} - {}={}",
133               id,
134               record.offset(),
135               record.topic(),
136               record.partition(),
137               record.key(),
138               record.value()
139           );
140
141           Integer partition = record.partition();
142           String key = record.key() == null ? "NULL" : record.key();
143           Map<String, Long> byKey = seen.get(partition);
144
145           if (!byKey.containsKey(key))
146             byKey.put(key, 0l);
147
148           long seenByKey = byKey.get(key);
149           seenByKey++;
150           byKey.put(key, seenByKey);
151         }
152       }
153     }
154     catch(WakeupException e)
155     {
156       log.info("{} - RIIING!", id);
157       shutdown();
158     }
159     catch(Exception e)
160     {
161       log.error("{} - Unexpected error: {}", id, e.toString(), e);
162       shutdown(e);
163     }
164     finally
165     {
166       log.info("{} - Closing the KafkaConsumer", id);
167       consumer.close();
168       log.info("{} - Consumer-Thread exiting", id);
169     }
170   }
171
172   private void shutdown()
173   {
174     shutdown(null);
175   }
176
177   private void shutdown(Exception e)
178   {
179     lock.lock();
180     try
181     {
182       running = false;
183       exception = e;
184       condition.signal();
185     }
186     finally
187     {
188       lock.unlock();
189     }
190   }
191
192   public Map<Integer, Map<String, Long>> getSeen()
193   {
194     return seen;
195   }
196
197   public void start()
198   {
199     lock.lock();
200     try
201     {
202       if (running)
203         throw new IllegalStateException("Consumer instance " + id + " is already running!");
204
205       log.info("{} - Starting - consumed {} messages before", id, consumed);
206       running = true;
207       exception = null;
208       executor.submit(this);
209     }
210     finally
211     {
212       lock.unlock();
213     }
214   }
215
216   public synchronized void stop() throws ExecutionException, InterruptedException
217   {
218     lock.lock();
219     try
220     {
221       if (!running)
222         throw new IllegalStateException("Consumer instance " + id + " is not running!");
223
224       log.info("{} - Stopping", id);
225       consumer.wakeup();
226       condition.await();
227       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
228     }
229     finally
230     {
231       lock.unlock();
232     }
233   }
234
235   @PreDestroy
236   public void destroy() throws ExecutionException, InterruptedException
237   {
238     log.info("{} - Destroy!", id);
239     try
240     {
241       stop();
242     }
243     catch (IllegalStateException e)
244     {
245       log.info("{} - Was already stopped", id);
246     }
247     catch (Exception e)
248     {
249       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
250     }
251     finally
252     {
253       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
254     }
255   }
256
257   public boolean running()
258   {
259     lock.lock();
260     try
261     {
262       return running;
263     }
264     finally
265     {
266       lock.unlock();
267     }
268   }
269
270   public Optional<Exception> exitStatus()
271   {
272     lock.lock();
273     try
274     {
275       if (running)
276         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
277
278       return Optional.ofNullable(exception);
279     }
280     finally
281     {
282       lock.unlock();
283     }
284   }
285 }