]> juplo.de Git - demos/kafka/training/commitdiff
Ordentlich mit `instance.consumer.wakeup()` grundlagen/simple-consumer--livecoding--schritte grundlagen/simple-consumer--livecoding--schritte--2026-06-lvm--rebase-vollständig
authorKai Moritz <kai@juplo.de>
Tue, 1 Apr 2025 21:41:24 +0000 (23:41 +0200)
committerKai Moritz <kai.milan.moritz@googlemail.com>
Fri, 12 Jun 2026 17:54:03 +0000 (19:54 +0200)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index df4bbf0c7ebb5a66f2dc62cb79779721f7a8d9e2..aeae5343fb60ddd76f4bc318a8c26720d41bbbe8 100644 (file)
@@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
@@ -67,6 +68,11 @@ public class ExampleConsumer
         }
       }
     }
+    catch(WakeupException e)
+    {
+      log.info("{}: Wakeup!", id);
+      consumer.close();
+    }
     finally
     {
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
@@ -87,7 +93,7 @@ public class ExampleConsumer
 
     Runtime.getRuntime().addShutdownHook(new Thread(() ->
     {
-      instance.consumer.close();
+      instance.consumer.wakeup();
       while (instance.running)
       {
         log.info("Waiting...");