b3dd446d84ddbc7fddd44d516cbd921eff1ece7e
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.common.errors.WakeupException;
8 import org.apache.kafka.common.serialization.StringDeserializer;
9
10 import javax.annotation.PreDestroy;
11 import java.time.Duration;
12 import java.util.Arrays;
13 import java.util.Properties;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.atomic.AtomicBoolean;
18
19
20 @Slf4j
21 public class EndlessConsumer implements Runnable
22 {
23   private final ExecutorService executor;
24   private final String bootstrapServer;
25   private final String groupId;
26   private final String id;
27   private final String topic;
28   private final String autoOffsetReset;
29
30   private AtomicBoolean running = new AtomicBoolean();
31   private long consumed = 0;
32   private KafkaConsumer<String, String> consumer = null;
33   private Future<?> future = null;
34
35   public EndlessConsumer(
36       ExecutorService executor,
37       String bootstrapServer,
38       String groupId,
39       String clientId,
40       String topic,
41       String autoOffsetReset)
42   {
43     this.executor = executor;
44     this.bootstrapServer = bootstrapServer;
45     this.groupId = groupId;
46     this.id = clientId;
47     this.topic = topic;
48     this.autoOffsetReset = autoOffsetReset;
49   }
50
51   @Override
52   public void run()
53   {
54     try
55     {
56       Properties props = new Properties();
57       props.put("bootstrap.servers", bootstrapServer);
58       props.put("group.id", groupId);
59       props.put("client.id", id);
60       props.put("auto.offset.reset", autoOffsetReset);
61       props.put("key.deserializer", StringDeserializer.class.getName());
62       props.put("value.deserializer", StringDeserializer.class.getName());
63
64       this.consumer = new KafkaConsumer<>(props);
65
66       log.info("{} - Subscribing to topic {}", id, topic);
67       consumer.subscribe(Arrays.asList(topic));
68
69       while (true)
70       {
71         ConsumerRecords<String, String> records =
72             consumer.poll(Duration.ofSeconds(1));
73
74         // Do something with the data...
75         log.info("{} - Received {} messages", id, records.count());
76         for (ConsumerRecord<String, String> record : records)
77         {
78           consumed++;
79           log.info(
80               "{} - {}: {}/{} - {}={}",
81               id,
82               record.offset(),
83               record.topic(),
84               record.partition(),
85               record.key(),
86               record.value()
87           );
88         }
89       }
90     }
91     catch(WakeupException e)
92     {
93       log.info("{} - RIIING!", id);
94     }
95     catch(Exception e)
96     {
97       log.error("{} - Unexpected error: {}", id, e.toString(), e);
98       running.set(false); // Mark the instance as not running
99     }
100     finally
101     {
102       log.info("{} - Closing the KafkaConsumer", id);
103       consumer.close();
104       log.info("{} - Consumer-Thread exiting", id);
105     }
106   }
107
108
109   public synchronized void start()
110   {
111     boolean stateChanged = running.compareAndSet(false, true);
112     if (!stateChanged)
113       throw new RuntimeException("Consumer instance " + id + " is already running!");
114
115     log.info("{} - Starting - consumed {} messages before", id, consumed);
116     future = executor.submit(this);
117   }
118
119   public synchronized void stop() throws ExecutionException, InterruptedException
120   {
121     boolean stateChanged = running.compareAndSet(true, false);
122     if (!stateChanged)
123       throw new RuntimeException("Consumer instance " + id + " is not running!");
124
125     log.info("{} - Stopping", id);
126     consumer.wakeup();
127     future.get();
128     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
129   }
130
131   @PreDestroy
132   public void destroy() throws ExecutionException, InterruptedException
133   {
134     log.info("{} - Destroy!", id);
135     try
136     {
137       stop();
138     }
139     catch (IllegalStateException e)
140     {
141       log.info("{} - Was already stopped", id);
142     }
143     finally
144     {
145       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
146     }
147   }
148 }