From 92037837a80e559234d70f94a86ab5e8f50547ea Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 15 Dec 2024 09:46:58 +0100 Subject: [PATCH] WIP:start+stop --- .../java/de/juplo/kafka/ExampleConsumer.java | 128 +++++++++++++----- 1 file changed, 95 insertions(+), 33 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 8b2611c5..65a1b5d1 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -10,6 +10,8 @@ import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @@ -20,9 +22,10 @@ public class ExampleConsumer implements Runnable private final Consumer consumer; private final RecordHandler recordHandler; private final ConsumerRebalanceListener rebalanceListener; - private final Thread workerThread; private final Runnable closeCallback; + private final Lock lock = new ReentrantLock(true); + private Thread workerThread; private volatile boolean running = false; private long consumed = 0; @@ -40,11 +43,9 @@ public class ExampleConsumer implements Runnable this.consumer = consumer; this.recordHandler = recordHandler; this.rebalanceListener = rebalanceListener; - - workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); - workerThread.start(); - this.closeCallback = closeCallback; + + start(); } @@ -59,27 +60,7 @@ public class ExampleConsumer implements Runnable while (running) { - try - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - catch(RecordDeserializationException e) - { - log.error("{} - Deserialization Exception {}:{}", id, e.topicPartition(), e.offset()); - consumer.seek(e.topicPartition(), e.offset() +1); - } + pollAndHandleRecords(); } } catch(WakeupException e) @@ -93,11 +74,38 @@ public class ExampleConsumer implements Runnable log.info("{} - Triggering exit of application!", id); new Thread(closeCallback).start(); } + } + + private void pollAndHandleRecords() + { + lock.lock(); + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + } + catch(RecordDeserializationException e) + { + log.error("{} - Deserialization Exception {}:{}", id, e.topicPartition(), e.offset()); + consumer.seek(e.topicPartition(), e.offset() +1); + } + catch(Exception e) + { + running = false; + } finally { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + lock.unlock(); } } @@ -114,11 +122,65 @@ public class ExampleConsumer implements Runnable } - public void shutdown() throws InterruptedException + public boolean isRunning() + { + return running; + } + + public void start() { - log.info("{} joining the worker-thread...", id); - running = false; - consumer.wakeup(); + lock.lock(); + try + { + if (workerThread != null) + { + throw new RuntimeException(id + + " - Worker thread is already running: " + + workerThread.toString()); + } + + log.info("{} starting the worker-thread...", id); + workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); + workerThread.start(); + } + finally + { + lock.unlock(); + } + } + + public void stop() throws InterruptedException + { + lock.lock(); + try + { + if (workerThread == null) + { + throw new RuntimeException(id + + " - Worker thread is already running: " + + workerThread.toString()); + } + + if (running) + { + log.info("{} - Signaling the KafkaConsumer to finish its work", id); + consumer.wakeup(); + } + } + finally + { + lock.unlock(); + } + + log.info("{} - Joining the worker-thread...", id); workerThread.join(); + workerThread = null; + } + + public void shutdown() throws InterruptedException + { + log.info("{} - Closing the KafkaConsumer", id); + consumer.close(); + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } -- 2.20.1