splitter: 1.0.0-spring-ingetration - using a `message-key-expression`
authorKai Moritz <kai@juplo.de>
Sun, 17 Jul 2022 15:11:03 +0000 (17:11 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 17 Jul 2022 15:11:23 +0000 (17:11 +0200)
* The configuration can be simplified
* The `ChannelInterceptor` can be omitted, if a `message-key-expression`
  is specivied for the outbound channel.

src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index d9ba135..246fffd 100644 (file)
@@ -4,23 +4,19 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
 import org.springframework.expression.common.LiteralExpression;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
 import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.kafka.inbound.KafkaMessageSource;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
-import org.springframework.integration.support.MessageBuilder;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ConsumerProperties;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
-import org.springframework.messaging.MessageHeaders;
-import org.springframework.messaging.support.ChannelInterceptor;
 
 
 @SpringBootApplication
@@ -28,33 +24,6 @@ import org.springframework.messaging.support.ChannelInterceptor;
 @EnableIntegration
 public class SplitterApplication
 {
-       @Bean
-       MessageChannel words(ChannelInterceptor keyInterceptor)
-       {
-               DirectChannel words = new DirectChannel();
-               words.addInterceptor(keyInterceptor);
-               return words;
-       }
-
-       @Bean
-       ChannelInterceptor keyInterceptor ()
-       {
-               return new ChannelInterceptor()
-               {
-                       @Override
-                       public Message<?> preSend(Message<?> message, MessageChannel channel)
-                       {
-                               MessageHeaders headers = message.getHeaders();
-                               Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
-                               return
-                                               MessageBuilder
-                                                               .fromMessage(message)
-                                                               .setHeader(KafkaHeaders.MESSAGE_KEY, key)
-                                                               .build();
-                       }
-               };
-       }
-
        @InboundChannelAdapter(channel = "recordings")
        @Bean
        KafkaMessageSource<String, String> source(
@@ -73,6 +42,9 @@ public class SplitterApplication
                KafkaProducerMessageHandler<String, String> handler =
                                new KafkaProducerMessageHandler<>(kafkaTemplate);
                handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+               final ExpressionParser parser = new SpelExpressionParser();
+               Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
+               handler.setMessageKeyExpression(expression);
                return handler;
        }