]> juplo.de Git - demos/kafka/training/commitdiff
`instance.consumer.close()` von einem anderen Thread
authorKai Moritz <kai@juplo.de>
Tue, 1 Apr 2025 21:37:52 +0000 (23:37 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 14:42:20 +0000 (15:42 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index f9d4fc64e9e432e4257b793d387ea42a80991203..867215729072858783739fabfe759ec14261131d 100644 (file)
@@ -15,13 +15,19 @@ import java.util.Properties;
 @Slf4j
 public class ExampleConsumer
 {
-  public static void main(String[] args) throws Exception
-  {
-    String broker = "localhost:9092";
-    String topic = "test";
-    String groupId = "my-group";
-    String clientId = "DEV";
+  private final String id;
+  private final String topic;
+  private final Consumer<String, String> consumer;
+
+  private volatile boolean running = false;
+  private long consumed = 0;
 
+  public ExampleConsumer(
+    String broker,
+    String topic,
+    String groupId,
+    String clientId)
+  {
     Properties props = new Properties();
     props.put("bootstrap.servers", broker);
     props.put("group.id", groupId); // ID für die Offset-Commits
@@ -29,15 +35,19 @@ public class ExampleConsumer
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
-    Consumer<String, String> consumer = new KafkaConsumer<>(props);
+    this.id = clientId;
+    this.topic = topic;
+    consumer = new KafkaConsumer<>(props);
+  }
 
-    String id = clientId;
-    long consumed = 0;
 
+  void run()
+  {
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic));
+      running = true;
 
       while (true)
       {
@@ -60,7 +70,37 @@ public class ExampleConsumer
     finally
     {
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+      running = false;
     }
   }
+
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length != 4)
+    {
+      System.exit(1);
+      return;
+    }
+
+    ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]);
+
+    Runtime.getRuntime().addShutdownHook(new Thread(() ->
+    {
+      instance.consumer.close();
+      while (instance.running)
+      {
+        log.info("Waiting...");
+        try
+        {
+          Thread.sleep(1000);
+        }
+        catch (InterruptedException e) {}
+      }
+      log.info("DONE!");
+    }));
+
+    instance.run();
+  }
 }