From: Kai Moritz Date: Tue, 1 Apr 2025 21:37:52 +0000 (+0200) Subject: `instance.consumer.close()` von einem anderen Thread X-Git-Tag: grundlagen/simple-consumer--livecoding--schritte--2026-03-22--20-47~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=124a5bb6cdc0ce062b311612aac4454f4e478454;p=demos%2Fkafka%2Ftraining `instance.consumer.close()` von einem anderen Thread --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f9d4fc64..86721572 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -15,13 +15,19 @@ import java.util.Properties; @Slf4j public class ExampleConsumer { - public static void main(String[] args) throws Exception - { - String broker = "localhost:9092"; - String topic = "test"; - String groupId = "my-group"; - String clientId = "DEV"; + private final String id; + private final String topic; + private final Consumer consumer; + + private volatile boolean running = false; + private long consumed = 0; + public ExampleConsumer( + String broker, + String topic, + String groupId, + String clientId) + { Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("group.id", groupId); // ID für die Offset-Commits @@ -29,15 +35,19 @@ public class ExampleConsumer props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); - Consumer consumer = new KafkaConsumer<>(props); + this.id = clientId; + this.topic = topic; + consumer = new KafkaConsumer<>(props); + } - String id = clientId; - long consumed = 0; + void run() + { try { log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); + running = true; while (true) { @@ -60,7 +70,37 @@ public class ExampleConsumer finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + running = false; } } + + + public static void main(String[] args) throws Exception + { + if (args.length != 4) + { + System.exit(1); + return; + } + + ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> + { + instance.consumer.close(); + while (instance.running) + { + log.info("Waiting..."); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } + log.info("DONE!"); + })); + + instance.run(); + } }