From: Kai Moritz Date: Sat, 24 Sep 2022 12:46:40 +0000 (+0200) Subject: Implementierung an Folien angepasst: Beendigung durch `+Consumer.wakeup()+` X-Git-Tag: simple-consumer-vorlage~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=50b6786696786c09c95f9aaa5c6851728832c89b;p=demos%2Fkafka%2Ftraining Implementierung an Folien angepasst: Beendigung durch `+Consumer.wakeup()+` --- diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index ef8d7e3..586bd07 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; @@ -18,9 +19,8 @@ public class SimpleConsumer private final String topic; private final KafkaConsumer consumer; + private volatile boolean running = false; private long consumed = 0; - private volatile boolean running = true; - private volatile boolean done = false; public SimpleConsumer(String broker, String topic, String groupId, String clientId) { @@ -45,8 +45,9 @@ public class SimpleConsumer { log.info("{} - Subscribing to topic test", id); consumer.subscribe(Arrays.asList("test")); + running = true; - while (running) + while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); @@ -67,16 +68,20 @@ public class SimpleConsumer } } } + catch(WakeupException e) + { + log.info("{} - Consumer was signaled to finish its work", id); + } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString()); } finally { + running = false; log.info("{} - Closing the KafkaConsumer", id); consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - done = true; } } @@ -105,8 +110,9 @@ public class SimpleConsumer Runtime.getRuntime().addShutdownHook(new Thread(() -> { - instance.running = false; - while (!instance.done) + instance.consumer.wakeup(); + + while (instance.running) { log.info("Waiting for main-thread..."); try