+package de.juplo.kafka.seek;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+
+@Slf4j
+public class Consumer implements Runnable
+{
+ private final ExecutorService executor;
+ private final String id;
+ private final String topic;
+ private final KafkaConsumer<Long, String> consumer;
+
+ private boolean running = false;
+ Future<?> future = null;
+
+
+ public Consumer(
+ ExecutorService executor,
+ String bootstrapServer,
+ String groupId,
+ String clientId,
+ String topic)
+ {
+ this.executor = executor;
+
+ this.id = clientId;
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServer);
+ props.put("group.id", groupId);
+ props.put("client.id", clientId);
+ props.put("key.deserializer", LongDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ consumer = new KafkaConsumer<>(props);
+ }
+
+
+ @Override
+ public void run()
+ {
+ log.info("{} - Subscribing to topic test", id);
+ consumer.subscribe(Arrays.asList(topic));
+
+ try
+ {
+
+ running = true;
+
+ while (running)
+ {
+ ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
+ for (ConsumerRecord<Long, String> record : records)
+ log.info(
+ "{} - {}: {}/{} - {}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.value()
+ );
+ }
+ }
+ catch(WakeupException e)
+ {
+ log.info("{} - RIIING!", id);
+ }
+ finally
+ {
+ log.info("{} - Unsubscribing...", id);
+ consumer.unsubscribe();
+ running = false;
+ }
+ }
+
+ public synchronized void start()
+ {
+ if (running)
+ throw new RuntimeException("Consumier instance " + id + " is already running!");
+
+ log.info("Running {}", id);
+ future = executor.submit(this);
+ }
+
+ public synchronized void stop() throws ExecutionException, InterruptedException
+ {
+ if (!running)
+ throw new RuntimeException("Consumier instance " + id + " is not running!");
+
+ log.info("Stopping {}", id);
+ running = false;
+ consumer.wakeup();
+ future.get();
+ }
+
+ @PreDestroy
+ public void destroy() throws ExecutionException, InterruptedException
+ {
+ stop();
+ log.info("{} - Closing the KafkaConsumer", id);
+ consumer.close(Duration.ofSeconds(3));
+ }
+}