private final Thread workerThread;
private final Runnable closeCallback;
- private volatile boolean running = false;
private long consumed = 0;
{
log.info("{} - Subscribing to topic {}", id, topic);
consumer.subscribe(Arrays.asList(topic));
- running = true;
- while (running)
+ while (true)
{
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
public void shutdown() throws InterruptedException
{
- log.info("{} joining the worker-thread...", id);
- running = false;
+ log.info("{} - Waking up the consumer", id);
consumer.wakeup();
+ log.info("{} - Joining the worker thread", id);
workerThread.join();
}
}