Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.clients.consumer.ConsumerRecord;
7 import org.apache.kafka.clients.consumer.ConsumerRecords;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.errors.WakeupException;
11
12 import javax.annotation.PreDestroy;
13 import java.time.Duration;
14 import java.util.*;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20
21
22 @Slf4j
23 @RequiredArgsConstructor
24 public class EndlessConsumer<K, V> implements Runnable
25 {
26   private final ExecutorService executor;
27   private final String id;
28   private final String topic;
29   private final Consumer<K, V> consumer;
30   private final RebalanceListener rebalanceListener;
31   private final RecordHandler<K, V> recordHandler;
32
33   private final Lock lock = new ReentrantLock();
34   private final Condition condition = lock.newCondition();
35   private boolean running = false;
36   private Exception exception;
37   private long consumed = 0;
38
39
40
41   @Override
42   public void run()
43   {
44     try
45     {
46       log.info("{} - Subscribing to topic {}", id, topic);
47       rebalanceListener.enableCommits();
48       consumer.subscribe(Arrays.asList(topic), rebalanceListener);
49
50       while (true)
51       {
52         ConsumerRecords<K, V> records =
53             consumer.poll(Duration.ofSeconds(1));
54
55         // Do something with the data...
56         log.info("{} - Received {} messages", id, records.count());
57         for (ConsumerRecord<K, V> record : records)
58         {
59           log.info(
60               "{} - {}: {}/{} - {}={}",
61               id,
62               record.offset(),
63               record.topic(),
64               record.partition(),
65               record.key(),
66               record.value()
67           );
68
69           recordHandler.accept(record);
70
71           consumed++;
72         }
73       }
74     }
75     catch(WakeupException e)
76     {
77       log.info("{} - RIIING! Request to stop consumption.", id);
78       shutdown();
79     }
80     catch(RecordDeserializationException e)
81     {
82       TopicPartition tp = e.topicPartition();
83       long offset = e.offset();
84       log.error(
85           "{} - Could not deserialize  message on topic {} with offset={}: {}",
86           id,
87           tp,
88           offset,
89           e.getCause().toString());
90
91       shutdown(e);
92     }
93     catch(Exception e)
94     {
95       log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
96       rebalanceListener.disableCommits();
97       shutdown(e);
98     }
99     finally
100     {
101       log.info("{} - Consumer-Thread exiting", id);
102     }
103   }
104
105   private void shutdown()
106   {
107     shutdown(null);
108   }
109
110   private void shutdown(Exception e)
111   {
112     lock.lock();
113     try
114     {
115       try
116       {
117         log.info("{} - Unsubscribing from topic {}", id, topic);
118         consumer.unsubscribe();
119       }
120       catch (Exception ue)
121       {
122         log.error(
123             "{} - Error while unsubscribing from topic {}: {}",
124             id,
125             topic,
126             ue.toString());
127       }
128       finally
129       {
130         running = false;
131         exception = e;
132         condition.signal();
133       }
134     }
135     finally
136     {
137       lock.unlock();
138     }
139   }
140
141   public void start()
142   {
143     lock.lock();
144     try
145     {
146       if (running)
147         throw new IllegalStateException("Consumer instance " + id + " is already running!");
148
149       log.info("{} - Starting - consumed {} messages before", id, consumed);
150       running = true;
151       exception = null;
152       executor.submit(this);
153     }
154     finally
155     {
156       lock.unlock();
157     }
158   }
159
160   public synchronized void stop() throws InterruptedException
161   {
162     lock.lock();
163     try
164     {
165       if (!running)
166         throw new IllegalStateException("Consumer instance " + id + " is not running!");
167
168       log.info("{} - Stopping", id);
169       consumer.wakeup();
170       condition.await();
171       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
172     }
173     finally
174     {
175       lock.unlock();
176     }
177   }
178
179   @PreDestroy
180   public void destroy() throws ExecutionException, InterruptedException
181   {
182     log.info("{} - Destroy!", id);
183     log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
184   }
185
186   public boolean running()
187   {
188     lock.lock();
189     try
190     {
191       return running;
192     }
193     finally
194     {
195       lock.unlock();
196     }
197   }
198
199   public Optional<Exception> exitStatus()
200   {
201     lock.lock();
202     try
203     {
204       if (running)
205         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
206
207       return Optional.ofNullable(exception);
208     }
209     finally
210     {
211       lock.unlock();
212     }
213   }
214 }