import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.function.Consumer;
public class ApplicationConfiguration
{
@Bean
- public Consumer<ConsumerRecord<String, Long>> consumer()
+ public Consumer<ConsumerRecord<String, ClientMessage>> consumer()
{
return (record) ->
{
}
@Bean
- public EndlessConsumer<String, Long> endlessConsumer(
- org.apache.kafka.clients.consumer.Consumer<String, Long> kafkaConsumer,
- ExecutorService executor,
- Consumer<ConsumerRecord<String, Long>> handler,
- KafkaProperties kafkaProperties,
- ApplicationProperties applicationProperties)
+ public ApplicationErrorHandler errorHandler()
{
- return
- new EndlessConsumer<>(
- executor,
- kafkaProperties.getConsumer().getClientId(),
- applicationProperties.getTopic(),
- kafkaConsumer,
- handler);
- }
-
- @Bean
- public ExecutorService executor()
- {
- return Executors.newSingleThreadExecutor();
+ return new ApplicationErrorHandler();
}
@Bean(destroyMethod = "close")
- public org.apache.kafka.clients.consumer.Consumer<String, Long> kafkaConsumer(ConsumerFactory<String, Long> factory)
+ public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
{
return factory.createConsumer();
}