]> juplo.de Git - demos/kafka/training/commitdiff
Ordentlich mit `instance.consumer.wakeup()` grundlagen/simple-consumer--livecoding--schritte--2026-03-22--20-47
authorKai Moritz <kai@juplo.de>
Tue, 1 Apr 2025 21:41:24 +0000 (23:41 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 14:42:20 +0000 (15:42 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 867215729072858783739fabfe759ec14261131d..223f0221de2547ca13ae006ae34a60be13554193 100644 (file)
@@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
@@ -67,6 +68,11 @@ public class ExampleConsumer
         }
       }
     }
+    catch(WakeupException e)
+    {
+      log.info("{}: Wakeup!", id);
+      consumer.close();
+    }
     finally
     {
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
@@ -87,7 +93,7 @@ public class ExampleConsumer
 
     Runtime.getRuntime().addShutdownHook(new Thread(() ->
     {
-      instance.consumer.close();
+      instance.consumer.wakeup();
       while (instance.running)
       {
         log.info("Waiting...");