`instance.consumer.close()` von einem anderen Thread
authorKai Moritz <kai@juplo.de>
Tue, 1 Apr 2025 21:37:52 +0000 (23:37 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 7 May 2025 19:14:14 +0000 (21:14 +0200)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index b02d502..bc9a8f8 100644 (file)
@@ -16,19 +16,20 @@ import java.util.Properties;
 @Slf4j
 public class ExampleConsumer
 {
-  private String id;
-  private String topic;
-  private Consumer<String, String> consumer;
+  private final String id;
+  private final String topic;
+  private final Consumer<String, String> consumer;
 
+  private volatile boolean running = false;
   private long consumed = 0;
 
-  public static void main(String[] args) throws Exception
-  {
-    String broker = "localhost:9092";
-    String topic = "test";
-    String groupId = "my-group";
-    String clientId = "DEV";
 
+  public ExampleConsumer(
+    String broker,
+    String topic,
+    String groupId,
+    String clientId)
+  {
     Properties props = new Properties();
     props.put("bootstrap.servers", broker);
     props.put("group.id", groupId); // ID für die Offset-Commits
@@ -36,10 +37,16 @@ public class ExampleConsumer
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
-    Consumer<String, String> consumer = new KafkaConsumer<>(props);
+    this.consumer = new KafkaConsumer<>(props);
+    this.id = clientId;
+    this.topic = topic;
+  }
+
 
-    String id = clientId;
-    long consumed = 0;
+  void run()
+  {
+    running = true;
+    consumed = 0;
 
     try
     {
@@ -69,5 +76,36 @@ public class ExampleConsumer
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
     }
   }
+
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length !=4)
+    {
+      System.exit(1);
+      return;
+    }
+
+    ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]);
+
+    Runtime.getRuntime().addShutdownHook(new Thread(() ->
+    {
+      while(instance.running)
+      {
+        instance.consumer.close();
+
+        try
+        {
+          log.info("{}: Waiting...", instance.id);
+          Thread.sleep(1000);
+        }
+        catch (Exception e) {}
+      }
+
+      log.info("{}: DONE!", instance.id);
+    }));
+
+    instance.run();
+  }
 }