X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=4c74eb2a66bec2a257eb05a0e83e01dbf7193a4e;hb=0224b257a2b0ef295765d1b3bfed7c4479c280e7;hp=5cefa3204842de29f9f84c1e8755ff282f428133;hpb=a9200a876060edc8683dfd6d0d16c23407c189ad;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 5cefa32..4c74eb2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; import java.util.function.Consumer; @@ -20,4 +24,22 @@ public class ApplicationConfiguration // Handle record }; } + + @Bean + public KafkaListenerContainerFactory batchFactory(ConsumerFactory consumerFactory) + { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + factory.setConsumerFactory(consumerFactory); + factory.setBatchListener(true); + + return factory; + } + + @Bean + public CommonContainerStoppingErrorHandler errorHandler() + { + return new CommonContainerStoppingErrorHandler(); + } }