X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=4c74eb2a66bec2a257eb05a0e83e01dbf7193a4e;hb=0224b257a2b0ef295765d1b3bfed7c4479c280e7;hp=fd4ff28659a84e14b6ed6f7aadf96a809f128827;hpb=ceb3caf09c5b8594493c6d98a1dd06f178b5f2d0;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index fd4ff28..4c74eb2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; import java.util.function.Consumer; @@ -22,6 +25,18 @@ public class ApplicationConfiguration }; } + @Bean + public KafkaListenerContainerFactory batchFactory(ConsumerFactory consumerFactory) + { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + factory.setConsumerFactory(consumerFactory); + factory.setBatchListener(true); + + return factory; + } + @Bean public CommonContainerStoppingErrorHandler errorHandler() {