WIP
authorKai Moritz <kai@juplo.de>
Wed, 2 Nov 2022 17:36:14 +0000 (18:36 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 2 Nov 2022 17:36:14 +0000 (18:36 +0100)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/SimpleConsumer.java [new file with mode: 0644]

index b4a960d..94224e1 100644 (file)
@@ -11,8 +11,6 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.core.ConsumerFactory;
 
 import javax.annotation.PreDestroy;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 
 @SpringBootApplication
diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java
new file mode 100644 (file)
index 0000000..0d371f4
--- /dev/null
@@ -0,0 +1,80 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+
+
+@Slf4j
+@RequiredArgsConstructor
+public class SimpleConsumer implements Runnable
+{
+  private final ExecutorService executor;
+  private final String id;
+  private final String topic;
+  private final Consumer<String, Message> consumer;
+
+  private volatile boolean running = false;
+  private long consumed = 0;
+
+
+  @Override
+  public void run()
+  {
+    try
+    {
+      log.info("{} - Subscribing to topic test", id);
+      consumer.subscribe(Arrays.asList("test"));
+      running = true;
+
+      while (true)
+      {
+        ConsumerRecords<String, Message> records =
+            consumer.poll(Duration.ofSeconds(1));
+
+        log.info("{} - Received {} messages", id, records.count());
+        for (ConsumerRecord<String, Message> record : records)
+        {
+          consumed++;
+          log.info(
+              "{} - {}: {}/{} - {}={}",
+              id,
+              record.offset(),
+              record.topic(),
+              record.partition(),
+              record.key(),
+              record.value()
+          );
+        }
+      }
+    }
+    catch(WakeupException e)
+    {
+      log.info("{} - Consumer was signaled to finish its work", id);
+    }
+    catch(Exception e)
+    {
+      log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString());
+      consumer.unsubscribe();
+    }
+    finally
+    {
+      running = false;
+      log.info("{} - Closing the KafkaConsumer", id);
+      consumer.close();
+      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+    }
+  }
+
+  public void start()
+  {
+    executor.submit(this);
+  }
+}