From: Kai Moritz Date: Sun, 6 Apr 2025 10:03:36 +0000 (+0200) Subject: `ExampleConsumer` in `DeadLetterConsumer` umbenannt -- MOVE X-Git-Tag: consumer/nodlt--2026-03-20--19-06~25 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=e7f63ed0be5f885608e4d4d25e4c3ff20a4f6cb8;p=demos%2Fkafka%2Ftraining `ExampleConsumer` in `DeadLetterConsumer` umbenannt -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java new file mode 100644 index 00000000..0c06b10f --- /dev/null +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -0,0 +1,134 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.WakeupException; +import org.springframework.context.SmartLifecycle; + +import java.time.Duration; +import java.util.Arrays; + + +@Slf4j +public class ExampleConsumer implements Runnable, SmartLifecycle +{ + private final String id; + private final String topic; + private final Consumer consumer; + + private Thread workerThread; + private volatile boolean running = false; + private long consumed = 0; + + + public ExampleConsumer( + String clientId, + String topic, + Consumer consumer) + { + this.id = clientId; + this.topic = topic; + this.consumer = consumer; + } + + @Override + public synchronized void start() + { + if (running) + { + log.info("{} - Already running!", id); + return; + } + running = true; + workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); + workerThread.start(); + } + + + @Override + public void run() + { + try + { + log.info("{} - Subscribing to topic {}", id, topic); + consumer.subscribe(Arrays.asList(topic)); + + while (true) + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + } + } + catch(WakeupException e) + { + log.info("{} - Consumer was signaled to finish its work", id); + } + catch(Exception e) + { + log.error("{} - Unexpected error, unsubscribing!", id, e); + consumer.unsubscribe(); + } + finally + { + log.info("{} - Closing the KafkaConsumer", id); + consumer.close(); + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + } + } + + private void handleRecord( + String topic, + Integer partition, + Long offset, + String key, + String value) + { + consumed++; + log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); + } + + + @Override + public boolean isRunning() + { + return running; + } + + @Override + public synchronized void stop() + { + if (!running) + { + log.info("{} - Not running!", id); + return; + } + + running = false; + + log.info("{} - Waking up the consumer", id); + consumer.wakeup(); + + log.info("{} - Joining the worker thread", id); + try + { + workerThread.join(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + log.error("{} - Interrupted while waiting for worker thread", id, e); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java deleted file mode 100644 index 0c06b10f..00000000 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ /dev/null @@ -1,134 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; -import org.springframework.context.SmartLifecycle; - -import java.time.Duration; -import java.util.Arrays; - - -@Slf4j -public class ExampleConsumer implements Runnable, SmartLifecycle -{ - private final String id; - private final String topic; - private final Consumer consumer; - - private Thread workerThread; - private volatile boolean running = false; - private long consumed = 0; - - - public ExampleConsumer( - String clientId, - String topic, - Consumer consumer) - { - this.id = clientId; - this.topic = topic; - this.consumer = consumer; - } - - @Override - public synchronized void start() - { - if (running) - { - log.info("{} - Already running!", id); - return; - } - running = true; - workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); - workerThread.start(); - } - - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - consumer.unsubscribe(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - } - - - @Override - public boolean isRunning() - { - return running; - } - - @Override - public synchronized void stop() - { - if (!running) - { - log.info("{} - Not running!", id); - return; - } - - running = false; - - log.info("{} - Waking up the consumer", id); - consumer.wakeup(); - - log.info("{} - Joining the worker thread", id); - try - { - workerThread.join(); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - log.error("{} - Interrupted while waiting for worker thread", id, e); - } - } -}