From: Kai Moritz Date: Tue, 1 Apr 2025 21:41:24 +0000 (+0200) Subject: Ordentlich mit `instance.consumer.wakeup()` X-Git-Tag: grundlagen/simple-consumer--livecoding--schritte--2026-03-22--22-01 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=e250f9600f751b07fb0ca287a462b4cabc3073f8;p=demos%2Fkafka%2Ftraining Ordentlich mit `instance.consumer.wakeup()` --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 86721572..223f0221 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; @@ -67,6 +68,11 @@ public class ExampleConsumer } } } + catch(WakeupException e) + { + log.info("{}: Wakeup!", id); + consumer.close(); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed); @@ -87,7 +93,7 @@ public class ExampleConsumer Runtime.getRuntime().addShutdownHook(new Thread(() -> { - instance.consumer.close(); + instance.consumer.wakeup(); while (instance.running) { log.info("Waiting...");