Endless Consumer: a simple consumer, implemented as Spring-Boot-App
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.common.errors.WakeupException;
8 import org.apache.kafka.common.serialization.StringDeserializer;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Duration;
12 import java.util.Arrays;
13 import java.util.Properties;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.atomic.AtomicBoolean;
18
19
20 @Slf4j
21 public class EndlessConsumer implements Runnable
22 {
23   private final ExecutorService executor;
24   private final String bootstrapServer;
25   private final String groupId;
26   private final String id;
27   private final String topic;
28
29   private AtomicBoolean running = new AtomicBoolean();
30   private long consumed = 0;
31   private KafkaConsumer<String, String> consumer = null;
32   private Future<?> future = null;
33
34   public EndlessConsumer(
35       ExecutorService executor,
36       String bootstrapServer,
37       String groupId,
38       String clientId,
39       String topic)
40   {
41     this.executor = executor;
42     this.bootstrapServer = bootstrapServer;
43     this.groupId = groupId;
44     this.id = clientId;
45     this.topic = topic;
46   }
47
48   @Override
49   public void run()
50   {
51     Properties props = new Properties();
52     props.put("bootstrap.servers", bootstrapServer);
53     props.put("group.id", groupId);
54     props.put("client.id", id);
55     props.put("auto.offset.reset", "earliest");
56     props.put("key.deserializer", StringDeserializer.class.getName());
57     props.put("value.deserializer", StringDeserializer.class.getName());
58
59     this.consumer = new KafkaConsumer<>(props);
60
61     try
62     {
63       log.info("{} - Subscribing to topic {}", id, topic);
64       consumer.subscribe(Arrays.asList(topic));
65
66       while (true)
67       {
68         ConsumerRecords<String, String> records =
69             consumer.poll(Duration.ofSeconds(1));
70
71         // Do something with the data...
72         log.info("{} - Received {} messages", id, records.count());
73         for (ConsumerRecord<String, String> record : records)
74         {
75           consumed++;
76           log.info(
77               "{} - {}: {}/{} - {}={}",
78               id,
79               record.offset(),
80               record.topic(),
81               record.partition(),
82               record.key(),
83               record.value()
84           );
85         }
86       }
87     }
88     catch(WakeupException e)
89     {
90       log.info("{} - RIIING!", id);
91     }
92     catch(Exception e)
93     {
94       log.error("{} - Unexpected error: {}", id, e.toString());
95       running.set(false); // Mark the instance as not running
96     }
97     finally
98     {
99       log.info("{} - Closing the KafkaConsumer", id);
100       consumer.close();
101       log.info("{} - Consumer-Thread exiting", id);
102     }
103   }
104
105
106   public synchronized void start()
107   {
108     boolean stateChanged = running.compareAndSet(false, true);
109     if (!stateChanged)
110       throw new RuntimeException("Consumer instance " + id + " is already running!");
111
112     log.info("{} - Starting - consumed {} messages before", id, consumed);
113     future = executor.submit(this);
114   }
115
116   public synchronized void stop() throws ExecutionException, InterruptedException
117   {
118     boolean stateChanged = running.compareAndSet(true, false);
119     if (!stateChanged)
120       throw new RuntimeException("Consumer instance " + id + " is not running!");
121
122     log.info("{} - Stopping", id);
123     consumer.wakeup();
124     future.get();
125     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
126   }
127
128   @PreDestroy
129   public void destroy() throws ExecutionException, InterruptedException
130   {
131     log.info("{} - Destroy!", id);
132     try
133     {
134       stop();
135     }
136     catch (IllegalStateException e)
137     {
138       log.info("{} - Was already stopped", id);
139     }
140     finally
141     {
142       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
143     }
144   }
145 }