- log.info("{} - Stopping", id);
- consumer.wakeup();
- condition.await();
- log.info("{} - Stopped - consumed {} messages so far", id, consumed);
- }
- finally
- {
- lock.unlock();
- }
- }
-
- @PreDestroy
- public void destroy() throws ExecutionException, InterruptedException
- {
- log.info("{} - Destroy!", id);
- try
- {
- stop();
- }
- catch (IllegalStateException e)
- {
- log.info("{} - Was already stopped", id);
- }
- catch (Exception e)
- {
- log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
- }
- finally
- {
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
- }
- }
-
- public boolean running()
- {
- lock.lock();
- try
- {
- return running;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public Optional<Exception> exitStatus()
- {
- lock.lock();
- try
- {
- if (running)
- throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
- return Optional.ofNullable(exception);
- }
- finally
- {
- lock.unlock();
- }
+ log.info("{} - Stopping", id);
+ registry.getListenerContainer(id).stop();
+ log.info("{} - Stopped - consumed {} messages so far", id, consumed);