@Override
public void run()
{
- log.info("{} - Subscribing to topic test", id);
- consumer.subscribe(Arrays.asList(topic));
-
try
{
-
- running = true;
+ log.info("{} - Subscribing to topic test", id);
+ consumer.subscribe(Arrays.asList(topic));
while (running)
{
finally
{
log.info("{} - Unsubscribing...", id);
- consumer.unsubscribe();
running = false;
+ consumer.unsubscribe();
offset = null;
}
}
throw new RuntimeException("Consumier instance " + id + " is already running!");
log.info("Running {}", id);
+ running = true;
future = executor.submit(this);
}
future.get();
}
-
@PreDestroy
public void destroy() throws ExecutionException, InterruptedException
{