import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
public EndlessConsumer<String, Message> endlessConsumer(
KafkaConsumer<String, Message> kafkaConsumer,
ExecutorService executor,
+ ConfigurableApplicationContext applicationContext,
ApplicationRebalanceListener rebalanceListener,
ApplicationRecordHandler recordHandler,
ApplicationProperties properties)
return
new EndlessConsumer<>(
executor,
+ applicationContext,
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,