Merge der Upgrades für Confluent/Spring-Boot (Branch 'first-contact')
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.common.errors.WakeupException;
8 import org.apache.kafka.common.serialization.StringDeserializer;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Duration;
12 import java.util.Arrays;
13 import java.util.Optional;
14 import java.util.Properties;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20
21
22 @Slf4j
23 public class EndlessConsumer implements Runnable
24 {
25   private final ExecutorService executor;
26   private final String bootstrapServer;
27   private final String groupId;
28   private final String id;
29   private final String topic;
30   private final String autoOffsetReset;
31
32   private final Lock lock = new ReentrantLock();
33   private final Condition condition = lock.newCondition();
34   private boolean running = false;
35   private Exception exception;
36   private long consumed = 0;
37   private KafkaConsumer<String, String> consumer = null;
38
39
40   public EndlessConsumer(
41       ExecutorService executor,
42       String bootstrapServer,
43       String groupId,
44       String clientId,
45       String topic,
46       String autoOffsetReset)
47   {
48     this.executor = executor;
49     this.bootstrapServer = bootstrapServer;
50     this.groupId = groupId;
51     this.id = clientId;
52     this.topic = topic;
53     this.autoOffsetReset = autoOffsetReset;
54   }
55
56   @Override
57   public void run()
58   {
59     try
60     {
61       Properties props = new Properties();
62       props.put("bootstrap.servers", bootstrapServer);
63       props.put("group.id", groupId);
64       props.put("client.id", id);
65       props.put("auto.offset.reset", autoOffsetReset);
66       props.put("key.deserializer", StringDeserializer.class.getName());
67       props.put("value.deserializer", StringDeserializer.class.getName());
68
69       this.consumer = new KafkaConsumer<>(props);
70
71       log.info("{} - Subscribing to topic {}", id, topic);
72       consumer.subscribe(Arrays.asList(topic));
73
74       while (true)
75       {
76         ConsumerRecords<String, String> records =
77             consumer.poll(Duration.ofSeconds(1));
78
79         // Do something with the data...
80         log.info("{} - Received {} messages", id, records.count());
81         for (ConsumerRecord<String, String> record : records)
82         {
83           consumed++;
84           log.info(
85               "{} - {}: {}/{} - {}={}",
86               id,
87               record.offset(),
88               record.topic(),
89               record.partition(),
90               record.key(),
91               record.value()
92           );
93         }
94       }
95     }
96     catch(WakeupException e)
97     {
98       log.info("{} - RIIING!", id);
99       shutdown();
100     }
101     catch(Exception e)
102     {
103       log.error("{} - Unexpected error: {}", id, e.toString(), e);
104       shutdown(e);
105     }
106     finally
107     {
108       log.info("{} - Closing the KafkaConsumer", id);
109       consumer.close();
110       log.info("{} - Consumer-Thread exiting", id);
111     }
112   }
113
114   private void shutdown()
115   {
116     shutdown(null);
117   }
118
119   private void shutdown(Exception e)
120   {
121     lock.lock();
122     try
123     {
124       running = false;
125       exception = e;
126       condition.signal();
127     }
128     finally
129     {
130       lock.unlock();
131     }
132   }
133
134   public void start()
135   {
136     lock.lock();
137     try
138     {
139       if (running)
140         throw new IllegalStateException("Consumer instance " + id + " is already running!");
141
142       log.info("{} - Starting - consumed {} messages before", id, consumed);
143       running = true;
144       exception = null;
145       executor.submit(this);
146     }
147     finally
148     {
149       lock.unlock();
150     }
151   }
152
153   public synchronized void stop() throws ExecutionException, InterruptedException
154   {
155     lock.lock();
156     try
157     {
158       if (!running)
159         throw new IllegalStateException("Consumer instance " + id + " is not running!");
160
161       log.info("{} - Stopping", id);
162       consumer.wakeup();
163       condition.await();
164       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
165     }
166     finally
167     {
168       lock.unlock();
169     }
170   }
171
172   @PreDestroy
173   public void destroy() throws ExecutionException, InterruptedException
174   {
175     log.info("{} - Destroy!", id);
176     try
177     {
178       stop();
179     }
180     catch (IllegalStateException e)
181     {
182       log.info("{} - Was already stopped", id);
183     }
184     catch (Exception e)
185     {
186       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
187     }
188     finally
189     {
190       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
191     }
192   }
193
194   public boolean running()
195   {
196     lock.lock();
197     try
198     {
199       return running;
200     }
201     finally
202     {
203       lock.unlock();
204     }
205   }
206
207   public Optional<Exception> exitStatus()
208   {
209     lock.lock();
210     try
211     {
212       if (running)
213         throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
214
215       return Optional.ofNullable(exception);
216     }
217     finally
218     {
219       lock.unlock();
220     }
221   }
222 }