X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=3828050e00aa90529924cfc0f65d49568e2da254;hb=5c1e4e203c3c0d7c07b4ce1293a4a120faf1db3c;hp=246fffd66a37442954eb689ace8c86cd1dd34736;hpb=9f82f66f1a9ed3194889fee1b69a10287c4b03db;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 246fffd..3828050 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -8,14 +8,15 @@ import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.kafka.inbound.KafkaMessageSource; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.ConsumerProperties; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.messaging.MessageHandler; @@ -24,13 +25,25 @@ import org.springframework.messaging.MessageHandler; @EnableIntegration public class SplitterApplication { - @InboundChannelAdapter(channel = "recordings") @Bean - KafkaMessageSource source( - ConsumerFactory cf, + MessageProducer messageProducer( + KafkaMessageListenerContainer messageListenerContainer) + { + KafkaMessageDrivenChannelAdapter adapter = + new KafkaMessageDrivenChannelAdapter(messageListenerContainer); + adapter.setOutputChannelName("recordings"); + return adapter; + } + + @Bean + KafkaMessageListenerContainer messageListenerContainer( + ConsumerFactory consumerFactory, SplitterApplicationProperties properties) { - return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); + return + new KafkaMessageListenerContainer<>( + consumerFactory, + new ContainerProperties(properties.getInputTopic())); } @Bean