From 610cb0e7e5dba71aacf3b67c44e11d400202304c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 1 Apr 2025 23:37:52 +0200 Subject: [PATCH] `instance.consumer.close()` von einem anderen Thread --- .../java/de/juplo/kafka/ExampleConsumer.java | 62 +++++++++++++++---- 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index b02d502..bc9a8f8 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -16,19 +16,20 @@ import java.util.Properties; @Slf4j public class ExampleConsumer { - private String id; - private String topic; - private Consumer consumer; + private final String id; + private final String topic; + private final Consumer consumer; + private volatile boolean running = false; private long consumed = 0; - public static void main(String[] args) throws Exception - { - String broker = "localhost:9092"; - String topic = "test"; - String groupId = "my-group"; - String clientId = "DEV"; + public ExampleConsumer( + String broker, + String topic, + String groupId, + String clientId) + { Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("group.id", groupId); // ID für die Offset-Commits @@ -36,10 +37,16 @@ public class ExampleConsumer props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); - Consumer consumer = new KafkaConsumer<>(props); + this.consumer = new KafkaConsumer<>(props); + this.id = clientId; + this.topic = topic; + } + - String id = clientId; - long consumed = 0; + void run() + { + running = true; + consumed = 0; try { @@ -69,5 +76,36 @@ public class ExampleConsumer log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } + + + public static void main(String[] args) throws Exception + { + if (args.length !=4) + { + System.exit(1); + return; + } + + ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> + { + while(instance.running) + { + instance.consumer.close(); + + try + { + log.info("{}: Waiting...", instance.id); + Thread.sleep(1000); + } + catch (Exception e) {} + } + + log.info("{}: DONE!", instance.id); + })); + + instance.run(); + } } -- 2.20.1