refactor: Implementierung an Branch `stored-offsets` angepasst
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.*;
6 import org.apache.kafka.common.TopicPartition;
7 import org.apache.kafka.common.errors.RecordDeserializationException;
8 import org.apache.kafka.common.errors.WakeupException;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Duration;
12 import java.util.*;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
18
19
20 @Slf4j
21 @RequiredArgsConstructor
22 public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
23 {
24   private final ExecutorService executor;
25   private final String id;
26   private final String topic;
27   private final Consumer<K, V> consumer;
28   private final RecordHandler<K, V> handler;
29
30   private final Lock lock = new ReentrantLock();
31   private final Condition condition = lock.newCondition();
32   private boolean running = false;
33   private Exception exception;
34   private long consumed = 0;
35
36
37   @Override
38   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
39   {
40     partitions.forEach(tp -> handler.onPartitionRevoked(tp));
41   }
42
43   @Override
44   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
45   {
46     partitions.forEach(tp -> handler.onPartitionAssigned(tp));
47   }
48
49
50   @Override
51   public void run()
52   {
53     try
54     {
55       log.info("{} - Subscribing to topic {}", id, topic);
56       consumer.subscribe(Arrays.asList(topic), this);
57
58       while (true)
59       {
60         ConsumerRecords<K, V> records =
61             consumer.poll(Duration.ofSeconds(1));
62
63         // Do something with the data...
64         log.info("{} - Received {} messages", id, records.count());
65         for (ConsumerRecord<K, V> record : records)
66         {
67           log.info(
68               "{} - {}: {}/{} - {}={}",
69               id,
70               record.offset(),
71               record.topic(),
72               record.partition(),
73               record.key(),
74               record.value()
75           );
76
77           handler.accept(record);
78
79           consumed++;
80         }
81
82         handler.beforeNextPoll();
83       }
84     }
85     catch(WakeupException e)
86     {
87       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
88       shutdown();
89     }
90     catch(RecordDeserializationException e)
91     {
92       TopicPartition tp = e.topicPartition();
93       long offset = e.offset();
94       log.error(
95           "{} - Could not deserialize  message on topic {} with offset={}: {}",
96           id,
97           tp,
98           offset,
99           e.getCause().toString());
100
101       shutdown(e);
102     }
103     catch(Exception e)
104     {
105       log.error("{} - Unexpected error: {}", id, e.toString(), e);
106       shutdown(e);
107     }
108     finally
109     {
110       log.info("{} - Consumer-Thread exiting", id);
111     }
112   }
113
114   private void shutdown()
115   {
116     shutdown(null);
117   }
118
119   private void shutdown(Exception e)
120   {
121     lock.lock();
122     try
123     {
124       try
125       {
126         log.info("{} - Unsubscribing from topic {}", id, topic);
127         consumer.unsubscribe();
128       }
129       catch (Exception ue)
130       {
131         log.error(
132             "{} - Error while unsubscribing from topic {}: {}",
133             id,
134             topic,
135             ue.toString());
136       }
137       finally
138       {
139         running = false;
140         exception = e;
141         condition.signal();
142       }
143     }
144     finally
145     {
146       lock.unlock();
147     }
148   }
149
150   public void start()
151   {
152     lock.lock();
153     try
154     {
155       if (running)
156         throw new IllegalStateException("Consumer instance " + id + " is already running!");
157
158       log.info("{} - Starting - consumed {} messages before", id, consumed);
159       running = true;
160       exception = null;
161       executor.submit(this);
162     }
163     finally
164     {
165       lock.unlock();
166     }
167   }
168
169   public synchronized void stop() throws InterruptedException
170   {
171     lock.lock();
172     try
173     {
174       if (!running)
175         throw new IllegalStateException("Consumer instance " + id + " is not running!");
176
177       log.info("{} - Stopping", id);
178       consumer.wakeup();
179       condition.await();
180       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
181     }
182     finally
183     {
184       lock.unlock();
185     }
186   }
187
188   @PreDestroy
189   public void destroy() throws ExecutionException, InterruptedException
190   {
191     log.info("{} - Destroy!", id);
192     log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
193   }
194
195   public boolean running()
196   {
197     lock.lock();
198     try
199     {
200       return running;
201     }
202     finally
203     {
204       lock.unlock();
205     }
206   }
207
208   public Optional<Exception> exitStatus()
209   {
210     lock.lock();
211     try
212     {
213       if (running)
214         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
215
216       return Optional.ofNullable(exception);
217     }
218     finally
219     {
220       lock.unlock();
221     }
222   }
223 }