00678c45cf344dd43e5e4d5b5bdcbea74bd1a75d
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.*;
6 import org.apache.kafka.common.TopicPartition;
7 import org.apache.kafka.common.errors.RecordDeserializationException;
8 import org.apache.kafka.common.errors.WakeupException;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Duration;
12 import java.util.*;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
18
19
20 @Slf4j
21 @RequiredArgsConstructor
22 public class EndlessConsumer<K, V> implements Runnable
23 {
24   private final ExecutorService executor;
25   private final String id;
26   private final String topic;
27   private final Consumer<K, V> consumer;
28   private final ConsumerRebalanceListener rebalanceListener;
29   private final RecordHandler<K, V> recordHandler;
30
31   private final Lock lock = new ReentrantLock();
32   private final Condition condition = lock.newCondition();
33   private boolean running = false;
34   private Exception exception;
35   private long consumed = 0;
36
37
38
39   @Override
40   public void run()
41   {
42     try
43     {
44       log.info("{} - Subscribing to topic {}", id, topic);
45       consumer.subscribe(Arrays.asList(topic), rebalanceListener);
46
47       while (true)
48       {
49         ConsumerRecords<K, V> records =
50             consumer.poll(Duration.ofSeconds(1));
51
52         // Do something with the data...
53         log.info("{} - Received {} messages", id, records.count());
54         for (ConsumerRecord<K, V> record : records)
55         {
56           log.info(
57               "{} - {}: {}/{} - {}={}",
58               id,
59               record.offset(),
60               record.topic(),
61               record.partition(),
62               record.key(),
63               record.value()
64           );
65
66           recordHandler.accept(record);
67
68           consumed++;
69         }
70       }
71     }
72     catch(WakeupException e)
73     {
74       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
75       consumer.commitSync();
76       shutdown();
77     }
78     catch(RecordDeserializationException e)
79     {
80       TopicPartition tp = e.topicPartition();
81       long offset = e.offset();
82       log.error(
83           "{} - Could not deserialize  message on topic {} with offset={}: {}",
84           id,
85           tp,
86           offset,
87           e.getCause().toString());
88
89       consumer.commitSync();
90       shutdown(e);
91     }
92     catch(Exception e)
93     {
94       log.error("{} - Unexpected error: {}", id, e.toString(), e);
95       shutdown(e);
96     }
97     finally
98     {
99       log.info("{} - Consumer-Thread exiting", id);
100     }
101   }
102
103   private void shutdown()
104   {
105     shutdown(null);
106   }
107
108   private void shutdown(Exception e)
109   {
110     lock.lock();
111     try
112     {
113       try
114       {
115         log.info("{} - Unsubscribing from topic {}", id, topic);
116         consumer.unsubscribe();
117       }
118       catch (Exception ue)
119       {
120         log.error(
121             "{} - Error while unsubscribing from topic {}: {}",
122             id,
123             topic,
124             ue.toString());
125       }
126       finally
127       {
128         running = false;
129         exception = e;
130         condition.signal();
131       }
132     }
133     finally
134     {
135       lock.unlock();
136     }
137   }
138
139   public void start()
140   {
141     lock.lock();
142     try
143     {
144       if (running)
145         throw new IllegalStateException("Consumer instance " + id + " is already running!");
146
147       log.info("{} - Starting - consumed {} messages before", id, consumed);
148       running = true;
149       exception = null;
150       executor.submit(this);
151     }
152     finally
153     {
154       lock.unlock();
155     }
156   }
157
158   public synchronized void stop() throws InterruptedException
159   {
160     lock.lock();
161     try
162     {
163       if (!running)
164         throw new IllegalStateException("Consumer instance " + id + " is not running!");
165
166       log.info("{} - Stopping", id);
167       consumer.wakeup();
168       condition.await();
169       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
170     }
171     finally
172     {
173       lock.unlock();
174     }
175   }
176
177   @PreDestroy
178   public void destroy() throws ExecutionException, InterruptedException
179   {
180     log.info("{} - Destroy!", id);
181     log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
182   }
183
184   public boolean running()
185   {
186     lock.lock();
187     try
188     {
189       return running;
190     }
191     finally
192     {
193       lock.unlock();
194     }
195   }
196
197   public Optional<Exception> exitStatus()
198   {
199     lock.lock();
200     try
201     {
202       if (running)
203         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
204
205       return Optional.ofNullable(exception);
206     }
207     finally
208     {
209       lock.unlock();
210     }
211   }
212 }