+ public void start()
+ {
+ lock.lock();
+ try
+ {
+ if (running)
+ throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
+ log.info("{} - Starting - consumed {} messages before", id, consumed);
+ running = true;
+ exception = null;
+ executor.submit(this);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public synchronized void stop() throws InterruptedException
+ {
+ lock.lock();
+ try
+ {
+ if (!running)
+ throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
+ log.info("{} - Stopping", id);
+ consumer.wakeup();
+ condition.await();
+ log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ @PreDestroy
+ public void destroy() throws ExecutionException, InterruptedException
+ {
+ log.info("{} - Destroy!", id);
+ log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+ }
+
+ public boolean running()
+ {
+ lock.lock();
+ try
+ {
+ return running;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public Optional<Exception> exitStatus()
+ {
+ lock.lock();
+ try
+ {
+ if (running)
+ throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+
+ return Optional.ofNullable(exception);
+ }
+ finally
+ {
+ lock.unlock();
+ }