Den ConsumerRunner in den ExampleConsumer integriert
authorKai Moritz <kai@juplo.de>
Tue, 6 May 2025 18:47:13 +0000 (20:47 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 6 May 2025 19:09:33 +0000 (21:09 +0200)
src/main/java/de/juplo/kafka/ConsumerRunner.java [deleted file]
src/main/java/de/juplo/kafka/ExampleConsumer.java

diff --git a/src/main/java/de/juplo/kafka/ConsumerRunner.java b/src/main/java/de/juplo/kafka/ConsumerRunner.java
deleted file mode 100644 (file)
index b9a077a..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-package de.juplo.kafka;
-
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.time.Duration;
-
-
-@Component
-@Slf4j
-public class ConsumerRunner
-{
-  private final ExampleConsumer exampleConsumer;
-  private final Thread worker;
-
-  public ConsumerRunner(ExampleConsumer exampleConsumer)
-  {
-    this.exampleConsumer = exampleConsumer;
-    this.worker = new Thread(exampleConsumer, "ConsumerRunner-" + exampleConsumer);
-    log.info("Starting consumer: {}", exampleConsumer);
-    this.worker.start();
-  }
-
-  @PreDestroy
-  public void close()
-  {
-    log.info("Stopping: {}", exampleConsumer);
-    exampleConsumer.consumer.wakeup();
-    try
-    {
-      worker.join(Duration.ofSeconds(30));
-    }
-    catch (InterruptedException e)
-    {
-      log.error("Fehler: {} - {}", exampleConsumer, e.toString());
-    }
-    log.info("Done! {}", exampleConsumer);
-  }
-}
index beaec07..9759913 100644 (file)
@@ -1,10 +1,10 @@
 package de.juplo.kafka;
 
-import lombok.ToString;
+import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -15,12 +15,12 @@ import java.util.Properties;
 
 
 @Slf4j
-@ToString(of = "id")
 public class ExampleConsumer implements Runnable
 {
   private final String id;
   private final String topic;
-  final Consumer<String, String> consumer;
+  private final Consumer<String, String> consumer;
+  private final Thread worker;
 
   private long consumed = 0;
 
@@ -40,6 +40,10 @@ public class ExampleConsumer implements Runnable
     this.id = clientId;
     this.topic = topic;
     consumer = new KafkaConsumer<>(props);
+
+    this.worker = new Thread(this, "ConsumerRunner-" + id);
+    log.info("{} - Starting worker-thread", id);
+    this.worker.start();
   }
 
 
@@ -80,7 +84,7 @@ public class ExampleConsumer implements Runnable
     {
       log.info("{} - Closing the KafkaConsumer", id);
       consumer.close();
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+      log.info("{} - Consumed {} messages in total, exiting!", id, consumed);
     }
   }
 
@@ -94,5 +98,22 @@ public class ExampleConsumer implements Runnable
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
   }
+
+  @PreDestroy
+  public void shutdown()
+  {
+    log.info("{} - Waking up the consumer", id);
+    consumer.wakeup();
+    try
+    {
+      log.info("{} - Joining the worker-thread", id);
+      worker.join(Duration.ofSeconds(30));
+    }
+    catch (InterruptedException e)
+    {
+      log.error("{} - Joining was interrupted: {}", id, e.toString());
+    }
+    log.info("{} - Shutdown completed!", id);
+  }
 }