import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.context.SmartLifecycle;
import java.time.Duration;
import java.util.Arrays;
@Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements Runnable, SmartLifecycle
{
private final String id;
private final String topic;
private final Consumer<String, String> consumer;
- private final Thread workerThread;
- private final Runnable closeCallback;
+ private Thread workerThread;
+ private volatile boolean running = false;
private long consumed = 0;
public ExampleConsumer(
String clientId,
String topic,
- Consumer<String, String> consumer,
- Runnable closeCallback)
+ Consumer<String, String> consumer)
{
this.id = clientId;
this.topic = topic;
this.consumer = consumer;
+ }
+ @Override
+ public synchronized void start()
+ {
+ if (running)
+ {
+ log.info("{} - Already running!", id);
+ return;
+ }
+ running = true;
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
-
- this.closeCallback = closeCallback;
}
{
log.error("{} - Unexpected error, unsubscribing!", id, e);
consumer.unsubscribe();
- log.info("{} - Triggering exit of application!", id);
- new Thread(closeCallback).start();
}
finally
{
}
- public void shutdown() throws InterruptedException
+ @Override
+ public boolean isRunning()
+ {
+ return running;
+ }
+
+ @Override
+ public synchronized void stop()
{
+ if (!running)
+ {
+ log.info("{} - Not running!", id);
+ return;
+ }
+
+ running = false;
+
log.info("{} - Waking up the consumer", id);
consumer.wakeup();
+
log.info("{} - Joining the worker thread", id);
- workerThread.join();
+ try
+ {
+ workerThread.join();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ log.error("{} - Interrupted while waiting for worker thread", id, e);
+ }
}
}