Implementierung an Folien angepasst: Beendigung durch `+Consumer.wakeup()+`
authorKai Moritz <kai@juplo.de>
Sat, 24 Sep 2022 12:46:40 +0000 (14:46 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 24 Sep 2022 12:46:40 +0000 (14:46 +0200)
src/main/java/de/juplo/kafka/SimpleConsumer.java

index ef8d7e3..586bd07 100644 (file)
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
@@ -18,9 +19,8 @@ public class SimpleConsumer
   private final String topic;
   private final KafkaConsumer<String, String> consumer;
 
+  private volatile boolean running = false;
   private long consumed = 0;
-  private volatile boolean running = true;
-  private volatile boolean done = false;
 
   public SimpleConsumer(String broker, String topic, String groupId, String clientId)
   {
@@ -45,8 +45,9 @@ public class SimpleConsumer
     {
       log.info("{} - Subscribing to topic test", id);
       consumer.subscribe(Arrays.asList("test"));
+      running = true;
 
-      while (running)
+      while (true)
       {
         ConsumerRecords<String, String> records =
             consumer.poll(Duration.ofSeconds(1));
@@ -67,16 +68,20 @@ public class SimpleConsumer
         }
       }
     }
+    catch(WakeupException e)
+    {
+      log.info("{} - Consumer was signaled to finish its work", id);
+    }
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}", id, e.toString());
     }
     finally
     {
+      running = false;
       log.info("{} - Closing the KafkaConsumer", id);
       consumer.close();
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-      done = true;
     }
   }
 
@@ -105,8 +110,9 @@ public class SimpleConsumer
 
     Runtime.getRuntime().addShutdownHook(new Thread(() ->
     {
-      instance.running = false;
-      while (!instance.done)
+      instance.consumer.wakeup();
+
+      while (instance.running)
       {
         log.info("Waiting for main-thread...");
         try