+ @Bean
+ public EndlessConsumer<String, Message> endlessConsumer(
+ Consumer<String, Message> kafkaConsumer,
+ ExecutorService executor,
+ ApplicationRebalanceListener rebalanceListener,
+ RecordHandler recordHandler,
+ KafkaProperties kafkaProperties,
+ ApplicationProperties applicationProperties)
+ {
+ return
+ new EndlessConsumer<>(
+ executor,
+ kafkaProperties.getClientId(),
+ applicationProperties.getTopic(),
+ kafkaConsumer,
+ rebalanceListener,
+ recordHandler);
+ }
+
+ @Bean
+ public ExecutorService executor()
+ {
+ return Executors.newSingleThreadExecutor();
+ }
+
+ @Bean(destroyMethod = "close")
+ public Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
+ {
+ return factory.createConsumer();