WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.*;
6 import org.apache.kafka.common.TopicPartition;
7 import org.apache.kafka.common.errors.RecordDeserializationException;
8 import org.apache.kafka.common.errors.WakeupException;
9 import org.springframework.stereotype.Component;
10
11 import javax.annotation.PreDestroy;
12 import java.time.Duration;
13 import java.util.*;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.locks.Condition;
17 import java.util.concurrent.locks.Lock;
18 import java.util.concurrent.locks.ReentrantLock;
19
20
21 @Component
22 @Slf4j
23 @RequiredArgsConstructor
24 public class EndlessConsumer implements Runnable
25 {
26   private final ExecutorService executor;
27   private final String id;
28   private final String topic;
29   private final Consumer<K, V> consumer;
30   private final ConsumerRebalanceListener rebalanceListener;
31   private final RecordHandler<K, V> recordHandler;
32
33   private final Lock lock = new ReentrantLock();
34   private final Condition condition = lock.newCondition();
35   private boolean running = false;
36   private Exception exception;
37   private long consumed = 0;
38
39
40
41   @Override
42   public void run()
43   {
44     try
45     {
46       log.info("{} - Subscribing to topic {}", id, topic);
47       consumer.subscribe(Arrays.asList(topic), rebalanceListener);
48
49       while (true)
50       {
51         ConsumerRecords<K, V> records =
52             consumer.poll(Duration.ofSeconds(1));
53
54         // Do something with the data...
55         log.info("{} - Received {} messages", id, records.count());
56         for (ConsumerRecord<K, V> record : records)
57         {
58           log.info(
59               "{} - {}: {}/{} - {}={}",
60               id,
61               record.offset(),
62               record.topic(),
63               record.partition(),
64               record.key(),
65               record.value()
66           );
67
68           recordHandler.accept(record);
69
70           consumed++;
71         }
72       }
73     }
74     catch(WakeupException e)
75     {
76       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
77       consumer.commitSync();
78       shutdown();
79     }
80     catch(RecordDeserializationException e)
81     {
82       TopicPartition tp = e.topicPartition();
83       long offset = e.offset();
84       log.error(
85           "{} - Could not deserialize  message on topic {} with offset={}: {}",
86           id,
87           tp,
88           offset,
89           e.getCause().toString());
90
91       consumer.commitSync();
92       shutdown(e);
93     }
94     catch(Exception e)
95     {
96       log.error("{} - Unexpected error: {}", id, e.toString(), e);
97       shutdown(e);
98     }
99     finally
100     {
101       log.info("{} - Consumer-Thread exiting", id);
102     }
103   }
104
105   private void shutdown()
106   {
107     shutdown(null);
108   }
109
110   private void shutdown(Exception e)
111   {
112     lock.lock();
113     try
114     {
115       try
116       {
117         log.info("{} - Unsubscribing from topic {}", id, topic);
118         consumer.unsubscribe();
119       }
120       catch (Exception ue)
121       {
122         log.error(
123             "{} - Error while unsubscribing from topic {}: {}",
124             id,
125             topic,
126             ue.toString());
127       }
128       finally
129       {
130         running = false;
131         exception = e;
132         condition.signal();
133       }
134     }
135     finally
136     {
137       lock.unlock();
138     }
139   }
140
141   public void start()
142   {
143     lock.lock();
144     try
145     {
146       if (running)
147         throw new IllegalStateException("Consumer instance " + id + " is already running!");
148
149       log.info("{} - Starting - consumed {} messages before", id, consumed);
150       running = true;
151       exception = null;
152       executor.submit(this);
153     }
154     finally
155     {
156       lock.unlock();
157     }
158   }
159
160   public synchronized void stop() throws InterruptedException
161   {
162     lock.lock();
163     try
164     {
165       if (!running)
166         throw new IllegalStateException("Consumer instance " + id + " is not running!");
167
168       log.info("{} - Stopping", id);
169       consumer.wakeup();
170       condition.await();
171       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
172     }
173     finally
174     {
175       lock.unlock();
176     }
177   }
178
179   @PreDestroy
180   public void destroy() throws ExecutionException, InterruptedException
181   {
182     log.info("{} - Destroy!", id);
183     log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
184   }
185
186   public boolean running()
187   {
188     lock.lock();
189     try
190     {
191       return running;
192     }
193     finally
194     {
195       lock.unlock();
196     }
197   }
198
199   public Optional<Exception> exitStatus()
200   {
201     lock.lock();
202     try
203     {
204       if (running)
205         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
206
207       return Optional.ofNullable(exception);
208     }
209     finally
210     {
211       lock.unlock();
212     }
213   }
214 }