+ @Autowired
+ KafkaListenerEndpointRegistry registry;
+ @Value("${consumer.client-id}")
+ String clientId;
+
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception
+ {
+ log.info("Starting EndlessConsumer");
+ this.registry.getListenerContainer(clientId).start();
+ }
+
+ @PreDestroy
+ public void stopExecutor()
+ {
+ log.info("Stopping EndlessConsumer");
+ this.registry.getListenerContainer(clientId).stop();
+ }
+
+