From: Kai Moritz Date: Sun, 17 Jul 2022 17:03:18 +0000 (+0200) Subject: splitter: 1.0.0-spring-ingetration - message-driven channel-adapter X-Git-Tag: wip-header-enricher~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=0bcf271cffe6d7263d76dc6618af9a03b4cb6179;p=demos%2Fkafka%2Fwordcount splitter: 1.0.0-spring-ingetration - message-driven channel-adapter * Instead of the `@InboundChannelAdapter`-annotation, a `KafkaMessageDrivenChannelAdapter` can be instanciated. * The adapter takes a `AbstractMessageListenerContainer` as constructor argument. * The output-channel must be set explicitly on the created instance. * This approach is advantageous, if technical details are important, because the used `MessageListenerContainer` can be configured explicitly. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 246fffd..b5948c3 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -8,14 +8,15 @@ import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.kafka.inbound.KafkaMessageSource; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.ConsumerProperties; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.messaging.MessageHandler; @@ -24,15 +25,26 @@ import org.springframework.messaging.MessageHandler; @EnableIntegration public class SplitterApplication { - @InboundChannelAdapter(channel = "recordings") @Bean - KafkaMessageSource source( - ConsumerFactory cf, - SplitterApplicationProperties properties) + MessageProducer messageProducer( + KafkaMessageListenerContainer messageListenerContainer) { - return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); + KafkaMessageDrivenChannelAdapter adapter = + new KafkaMessageDrivenChannelAdapter(messageListenerContainer); + adapter.setOutputChannelName("recordings"); + return adapter; } + @Bean + KafkaMessageListenerContainer messageListenerContainer( + ConsumerFactory consumerFactory, + SplitterApplicationProperties properties) + { + return + new KafkaMessageListenerContainer<>( + consumerFactory, + new ContainerProperties(properties.getInputTopic())); + } @Bean @ServiceActivator(inputChannel = "words") MessageHandler handler(