From: Kai Moritz Date: Tue, 6 May 2025 18:47:13 +0000 (+0200) Subject: Den ConsumerRunner in den ExampleConsumer integriert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=09aecbbe7fa33fc8f91ec70de6cfe7f4f7ac3eed;p=demos%2Fkafka%2Ftraining Den ConsumerRunner in den ExampleConsumer integriert --- diff --git a/src/main/java/de/juplo/kafka/ConsumerRunner.java b/src/main/java/de/juplo/kafka/ConsumerRunner.java deleted file mode 100644 index b9a077a..0000000 --- a/src/main/java/de/juplo/kafka/ConsumerRunner.java +++ /dev/null @@ -1,40 +0,0 @@ -package de.juplo.kafka; - -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.time.Duration; - - -@Component -@Slf4j -public class ConsumerRunner -{ - private final ExampleConsumer exampleConsumer; - private final Thread worker; - - public ConsumerRunner(ExampleConsumer exampleConsumer) - { - this.exampleConsumer = exampleConsumer; - this.worker = new Thread(exampleConsumer, "ConsumerRunner-" + exampleConsumer); - log.info("Starting consumer: {}", exampleConsumer); - this.worker.start(); - } - - @PreDestroy - public void close() - { - log.info("Stopping: {}", exampleConsumer); - exampleConsumer.consumer.wakeup(); - try - { - worker.join(Duration.ofSeconds(30)); - } - catch (InterruptedException e) - { - log.error("Fehler: {} - {}", exampleConsumer, e.toString()); - } - log.info("Done! {}", exampleConsumer); - } -} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index beaec07..9759913 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,10 +1,10 @@ package de.juplo.kafka; -import lombok.ToString; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; @@ -15,12 +15,12 @@ import java.util.Properties; @Slf4j -@ToString(of = "id") public class ExampleConsumer implements Runnable { private final String id; private final String topic; - final Consumer consumer; + private final Consumer consumer; + private final Thread worker; private long consumed = 0; @@ -40,6 +40,10 @@ public class ExampleConsumer implements Runnable this.id = clientId; this.topic = topic; consumer = new KafkaConsumer<>(props); + + this.worker = new Thread(this, "ConsumerRunner-" + id); + log.info("{} - Starting worker-thread", id); + this.worker.start(); } @@ -80,7 +84,7 @@ public class ExampleConsumer implements Runnable { log.info("{} - Closing the KafkaConsumer", id); consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + log.info("{} - Consumed {} messages in total, exiting!", id, consumed); } } @@ -94,5 +98,22 @@ public class ExampleConsumer implements Runnable consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } + + @PreDestroy + public void shutdown() + { + log.info("{} - Waking up the consumer", id); + consumer.wakeup(); + try + { + log.info("{} - Joining the worker-thread", id); + worker.join(Duration.ofSeconds(30)); + } + catch (InterruptedException e) + { + log.error("{} - Joining was interrupted: {}", id, e.toString()); + } + log.info("{} - Shutdown completed!", id); + } }