From 7b017bc9954ecf26af755d43dd02999468cd94ea Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 Jun 2022 18:07:32 +0200 Subject: [PATCH] splitter: 1.0.0-spring-integration - Added the key of the incoming message * Added a `ChannelInterceptor`, that copies the key from the received message to the outgoing message. * This fixes the test. --- .../splitter/SplitterApplication.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 412f429..0a70551 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -7,13 +7,20 @@ import org.springframework.context.annotation.Bean; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.kafka.inbound.KafkaMessageSource; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; +import org.springframework.integration.support.MessageBuilder; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConsumerProperties; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.ChannelInterceptor; @SpringBootApplication @@ -21,6 +28,27 @@ import org.springframework.messaging.MessageHandler; @EnableIntegration public class SplitterApplication { + @Bean + MessageChannel words() + { + DirectChannel words = new DirectChannel(); + words.addInterceptor(new ChannelInterceptor() + { + @Override + public Message preSend(Message message, MessageChannel channel) + { + MessageHeaders headers = message.getHeaders(); + Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY); + return + MessageBuilder + .fromMessage(message) + .setHeader(KafkaHeaders.MESSAGE_KEY, key) + .build(); + } + }); + return words; + } + @InboundChannelAdapter(channel = "recordings") @Bean KafkaMessageSource source( -- 2.20.1