1 package de.juplo.kafka;
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.springframework.boot.context.properties.EnableConfigurationProperties;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
7 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
8 import org.springframework.kafka.config.KafkaListenerContainerFactory;
9 import org.springframework.kafka.core.ConsumerFactory;
10 import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
11 import org.springframework.kafka.listener.CommonErrorHandler;
13 import java.util.function.Consumer;
17 @EnableConfigurationProperties(ApplicationProperties.class)
18 public class ApplicationConfiguration
21 public Consumer<ConsumerRecord<String, Long>> consumer()
30 public KafkaListenerContainerFactory<?> batchFactory(
31 ConsumerFactory<String, Long> consumerFactory,
32 CommonErrorHandler errorHandler)
34 ConcurrentKafkaListenerContainerFactory<String, Long> factory =
35 new ConcurrentKafkaListenerContainerFactory<>();
37 factory.setConsumerFactory(consumerFactory);
38 factory.setCommonErrorHandler(errorHandler);
39 factory.setBatchListener(true);
45 public CommonContainerStoppingErrorHandler errorHandler()
47 return new CommonContainerStoppingErrorHandler();