private final String topic;
private final Consumer<String, String> consumer;
+ private volatile boolean running = false;
private long consumed = 0;
{
log.info("{} - Subscribing to topic test", id);
consumer.subscribe(Arrays.asList(topic));
+ running = true;
while (true)
{
}
finally
{
+ running = false;
log.info("{} - Closing the KafkaConsumer", id);
consumer.close();
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
}
+
+ public boolean isRunning()
+ {
+ return running;
+ }
}