+ public synchronized void stop() throws ExecutionException, InterruptedException
+ {
+ boolean stateChanged = running.compareAndSet(true, false);
+ if (!stateChanged)
+ throw new RuntimeException("Consumer instance " + id + " is not running!");
+
+ log.info("{} - Stopping", id);
+ consumer.wakeup();
+ future.get();
+ log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+ }
+
+ @PreDestroy
+ public void destroy() throws ExecutionException, InterruptedException
+ {
+ log.info("{} - Destroy!", id);
+ try
+ {
+ stop();
+ }
+ catch (IllegalStateException e)
+ {
+ log.info("{} - Was already stopped", id);
+ }
+ finally
+ {
+ log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+ }