From 63d1fb0a0d2615964a49fadbf9ff1c8beb35b258 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 1 Apr 2025 23:41:24 +0200 Subject: [PATCH] Ordentlich mit `instance.consumer.wakeup()` --- src/main/java/de/juplo/kafka/ExampleConsumer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 86721572..223f0221 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; @@ -67,6 +68,11 @@ public class ExampleConsumer } } } + catch(WakeupException e) + { + log.info("{}: Wakeup!", id); + consumer.close(); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed); @@ -87,7 +93,7 @@ public class ExampleConsumer Runtime.getRuntime().addShutdownHook(new Thread(() -> { - instance.consumer.close(); + instance.consumer.wakeup(); while (instance.running) { log.info("Waiting..."); -- 2.39.5