private final String topic;
private final Consumer<String, Message> consumer;
- private volatile boolean running = false;
private long consumed = 0;
try
{
log.info("{} - Subscribing to topic test", id);
- consumer.subscribe(Arrays.asList("test"));
- running = true;
+ consumer.subscribe(Arrays.asList(topic));
while (true)
{
}
finally
{
- running = false;
log.info("{} - Closing the KafkaConsumer", id);
consumer.close();
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
}
-
- public void start()
- {
- executor.submit(this);
- }
}