import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Component;
-import javax.annotation.PreDestroy;
import java.util.List;
import java.util.function.Consumer;
registry.getListenerContainer(id).stop();
log.info("{} - Stopped - consumed {} messages so far", id, consumed);
}
-
- @PreDestroy
- public void destroy()
- {
- log.info("{} - Destroy!", id);
- try
- {
- stop();
- }
- catch (IllegalStateException e)
- {
- log.info("{} - Was already stopped", id);
- }
- catch (Exception e)
- {
- log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
- }
- finally
- {
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
- }
- }
}