splitter: 1.0.0-spring-integration - Added the key of the incoming message
authorKai Moritz <kai@juplo.de>
Sun, 26 Jun 2022 16:07:32 +0000 (18:07 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 19:33:38 +0000 (21:33 +0200)
* Added a `ChannelInterceptor`, that copies the key from the received
  message to the outgoing message.
* This fixes the test.

src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index 412f429..0a70551 100644 (file)
@@ -7,13 +7,20 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.expression.common.LiteralExpression;
 import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.kafka.inbound.KafkaMessageSource;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.integration.support.MessageBuilder;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.ChannelInterceptor;
 
 
 @SpringBootApplication
@@ -21,6 +28,27 @@ import org.springframework.messaging.MessageHandler;
 @EnableIntegration
 public class SplitterApplication
 {
+       @Bean
+       MessageChannel words()
+       {
+               DirectChannel words = new DirectChannel();
+               words.addInterceptor(new ChannelInterceptor()
+               {
+                       @Override
+                       public Message<?> preSend(Message<?> message, MessageChannel channel)
+                       {
+                               MessageHeaders headers = message.getHeaders();
+                               Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
+                               return
+                                               MessageBuilder
+                                                               .fromMessage(message)
+                                                               .setHeader(KafkaHeaders.MESSAGE_KEY, key)
+                                                               .build();
+                       }
+               });
+               return words;
+       }
+
        @InboundChannelAdapter(channel = "recordings")
        @Bean
        KafkaMessageSource<String, String> source(