import java.time.Duration;
import java.util.Arrays;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
@Slf4j
private final Consumer<String, Long> consumer;
private final RecordHandler<String, Long> recordHandler;
private final ConsumerRebalanceListener rebalanceListener;
- private final Thread workerThread;
private final Runnable closeCallback;
+ private final Lock lock = new ReentrantLock(true);
+ private Thread workerThread;
private volatile boolean running = false;
private long consumed = 0;
this.consumer = consumer;
this.recordHandler = recordHandler;
this.rebalanceListener = rebalanceListener;
-
- workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
- workerThread.start();
-
this.closeCallback = closeCallback;
+
+ start();
}
while (running)
{
- try
- {
- ConsumerRecords<String, Long> records =
- consumer.poll(Duration.ofSeconds(1));
-
- log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, Long> record : records)
- {
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
- }
- }
- catch(RecordDeserializationException e)
- {
- log.error("{} - Deserialization Exception {}:{}", id, e.topicPartition(), e.offset());
- consumer.seek(e.topicPartition(), e.offset() +1);
- }
+ pollAndHandleRecords();
}
}
catch(WakeupException e)
log.info("{} - Triggering exit of application!", id);
new Thread(closeCallback).start();
}
+ }
+
+ private void pollAndHandleRecords()
+ {
+ lock.lock();
+ try
+ {
+ ConsumerRecords<String, Long> records = consumer.poll(Duration.ofSeconds(1));
+
+ log.info("{} - Received {} messages", id, records.count());
+ for (ConsumerRecord<String, Long> record : records)
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ }
+ catch(RecordDeserializationException e)
+ {
+ log.error("{} - Deserialization Exception {}:{}", id, e.topicPartition(), e.offset());
+ consumer.seek(e.topicPartition(), e.offset() +1);
+ }
+ catch(Exception e)
+ {
+ running = false;
+ }
finally
{
- log.info("{} - Closing the KafkaConsumer", id);
- consumer.close();
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+ lock.unlock();
}
}
}
- public void shutdown() throws InterruptedException
+ public boolean isRunning()
+ {
+ return running;
+ }
+
+ public void start()
{
- log.info("{} joining the worker-thread...", id);
- running = false;
- consumer.wakeup();
+ lock.lock();
+ try
+ {
+ if (workerThread != null)
+ {
+ throw new RuntimeException(id
+ + " - Worker thread is already running: "
+ + workerThread.toString());
+ }
+
+ log.info("{} starting the worker-thread...", id);
+ workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
+ workerThread.start();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void stop() throws InterruptedException
+ {
+ lock.lock();
+ try
+ {
+ if (workerThread == null)
+ {
+ throw new RuntimeException(id
+ + " - Worker thread is already running: "
+ + workerThread.toString());
+ }
+
+ if (running)
+ {
+ log.info("{} - Signaling the KafkaConsumer to finish its work", id);
+ consumer.wakeup();
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ log.info("{} - Joining the worker-thread...", id);
workerThread.join();
+ workerThread = null;
+ }
+
+ public void shutdown() throws InterruptedException
+ {
+ log.info("{} - Closing the KafkaConsumer", id);
+ consumer.close();
+ log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
}