splitter: 1.0.0-spring-ingetration - message-driven channel-adapter
authorKai Moritz <kai@juplo.de>
Sun, 17 Jul 2022 17:03:18 +0000 (19:03 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 19:33:52 +0000 (21:33 +0200)
* Instead of the `@InboundChannelAdapter`-annotation, a
  `KafkaMessageDrivenChannelAdapter` can be instanciated.
* The adapter takes a `AbstractMessageListenerContainer` as constructor
  argument.
* The output-channel must be set explicitly on the created instance.
* This approach is advantageous, if technical details are important,
  because the used `MessageListenerContainer` can be configured explicitly.

src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index 246fffd..3828050 100644 (file)
@@ -8,14 +8,15 @@ import org.springframework.expression.Expression;
 import org.springframework.expression.ExpressionParser;
 import org.springframework.expression.common.LiteralExpression;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
-import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
 import org.springframework.messaging.MessageHandler;
 
 
@@ -24,13 +25,25 @@ import org.springframework.messaging.MessageHandler;
 @EnableIntegration
 public class SplitterApplication
 {
-       @InboundChannelAdapter(channel = "recordings")
        @Bean
-       KafkaMessageSource<String, String> source(
-                       ConsumerFactory<String, String> cf,
+       MessageProducer messageProducer(
+                       KafkaMessageListenerContainer<String, String> messageListenerContainer)
+       {
+               KafkaMessageDrivenChannelAdapter<String, String> adapter =
+                               new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
+               adapter.setOutputChannelName("recordings");
+               return adapter;
+       }
+
+       @Bean
+       KafkaMessageListenerContainer<String, String> messageListenerContainer(
+                       ConsumerFactory<String,String> consumerFactory,
                        SplitterApplicationProperties properties)
        {
-               return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+               return
+                               new KafkaMessageListenerContainer<>(
+                                               consumerFactory,
+                                               new ContainerProperties(properties.getInputTopic()));
        }
 
        @Bean