1 package de.juplo.kafka;
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.springframework.boot.context.properties.EnableConfigurationProperties;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
7 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
8 import org.springframework.kafka.config.KafkaListenerContainerFactory;
9 import org.springframework.kafka.core.ConsumerFactory;
10 import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
12 import java.util.function.Consumer;
16 @EnableConfigurationProperties(ApplicationProperties.class)
17 public class ApplicationConfiguration
20 public Consumer<ConsumerRecord<String, Long>> consumer()
29 public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, Long> consumerFactory)
31 ConcurrentKafkaListenerContainerFactory<String, Long> factory =
32 new ConcurrentKafkaListenerContainerFactory<>();
34 factory.setConsumerFactory(consumerFactory);
35 factory.setBatchListener(true);
41 public CommonContainerStoppingErrorHandler errorHandler()
43 return new CommonContainerStoppingErrorHandler();