ROT: Merge der korrigierten Test-Logik deserialization -> into sumup-adder
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.*;
6 import org.apache.kafka.common.TopicPartition;
7 import org.apache.kafka.common.errors.RecordDeserializationException;
8 import org.apache.kafka.common.errors.WakeupException;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Duration;
12 import java.util.*;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
18
19
20 @Slf4j
21 @RequiredArgsConstructor
22 public class EndlessConsumer<K, V> implements Runnable
23 {
24   private final ExecutorService executor;
25   private final String id;
26   private final String topic;
27   private final Consumer<K, V> consumer;
28   private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
29   private final RecordHandler<K, V> recordHandler;
30
31   private final Lock lock = new ReentrantLock();
32   private final Condition condition = lock.newCondition();
33   private boolean running = false;
34   private Exception exception;
35   private long consumed = 0;
36
37
38
39   @Override
40   public void run()
41   {
42     try
43     {
44       log.info("{} - Subscribing to topic {}", id, topic);
45       rebalanceListener.enableCommits();
46       consumer.subscribe(Arrays.asList(topic), rebalanceListener);
47
48       while (true)
49       {
50         ConsumerRecords<K, V> records =
51             consumer.poll(Duration.ofSeconds(1));
52
53         // Do something with the data...
54         log.info("{} - Received {} messages", id, records.count());
55         for (ConsumerRecord<K, V> record : records)
56         {
57           log.info(
58               "{} - {}: {}/{} - {}={}",
59               id,
60               record.offset(),
61               record.topic(),
62               record.partition(),
63               record.key(),
64               record.value()
65           );
66
67           recordHandler.accept(record);
68
69           consumed++;
70         }
71
72         rebalanceListener.beforeNextPoll();
73       }
74     }
75     catch(WakeupException e)
76     {
77       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
78       shutdown();
79     }
80     catch(RecordDeserializationException e)
81     {
82       TopicPartition tp = e.topicPartition();
83       long offset = e.offset();
84       log.error(
85           "{} - Could not deserialize  message on topic {} with offset={}: {}",
86           id,
87           tp,
88           offset,
89           e.getCause().toString());
90
91       shutdown(e);
92     }
93     catch(Exception e)
94     {
95       log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
96       rebalanceListener.disableCommits();
97       shutdown(e);
98     }
99     finally
100     {
101       log.info("{} - Consumer-Thread exiting", id);
102     }
103   }
104
105   private void shutdown()
106   {
107     shutdown(null);
108   }
109
110   private void shutdown(Exception e)
111   {
112     lock.lock();
113     try
114     {
115       try
116       {
117         log.info("{} - Unsubscribing from topic {}", id, topic);
118         consumer.unsubscribe();
119       }
120       catch (Exception ue)
121       {
122         log.error(
123             "{} - Error while unsubscribing from topic {}: {}",
124             id,
125             topic,
126             ue.toString());
127       }
128       finally
129       {
130         running = false;
131         exception = e;
132         condition.signal();
133       }
134     }
135     finally
136     {
137       lock.unlock();
138     }
139   }
140
141   public void start()
142   {
143     lock.lock();
144     try
145     {
146       if (running)
147         throw new IllegalStateException("Consumer instance " + id + " is already running!");
148
149       log.info("{} - Starting - consumed {} messages before", id, consumed);
150       running = true;
151       exception = null;
152       executor.submit(this);
153     }
154     finally
155     {
156       lock.unlock();
157     }
158   }
159
160   public synchronized void stop() throws InterruptedException
161   {
162     lock.lock();
163     try
164     {
165       if (!running)
166         throw new IllegalStateException("Consumer instance " + id + " is not running!");
167
168       log.info("{} - Stopping", id);
169       consumer.wakeup();
170       condition.await();
171       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
172     }
173     finally
174     {
175       lock.unlock();
176     }
177   }
178
179   @PreDestroy
180   public void destroy() throws ExecutionException, InterruptedException
181   {
182     log.info("{} - Destroy!", id);
183     log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
184   }
185
186   public boolean running()
187   {
188     lock.lock();
189     try
190     {
191       return running;
192     }
193     finally
194     {
195       lock.unlock();
196     }
197   }
198
199   public Optional<Exception> exitStatus()
200   {
201     lock.lock();
202     try
203     {
204       if (running)
205         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
206
207       return Optional.ofNullable(exception);
208     }
209     finally
210     {
211       lock.unlock();
212     }
213   }
214 }