From bc490d203bb6fbe99c397310d0aa1d3081390f03 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 20 Jul 2022 21:02:42 +0200 Subject: [PATCH] splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages * Added a `ChannelInterceptor` to the `KafkaSubscribableChannel`, that adds the value of the `key`-Header as Header `kafka_messageKey`). * This fixes the ordering, because now, all words, that are split of from a recording of a user are send with the same key. --- .../splitter/SplitterApplication.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 3dcb443..cfe7a4a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -28,11 +28,15 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.messaging.support.MessageBuilder; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -79,11 +83,15 @@ public class SplitterApplication @Bean SubscribableKafkaChannel recordings( KafkaTemplate kafkaTemplate, - KafkaListenerContainerFactory> containerFactory) + KafkaListenerContainerFactory> containerFactory, + ChannelInterceptor messageKeyInterceptor) { SubscribableKafkaChannel channel = new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings"); channel.setGroupId("recordings"); + List interceptors = new ArrayList<>(1); + interceptors.add(messageKeyInterceptor); + channel.setInterceptors(interceptors); return channel; } @@ -105,11 +113,15 @@ public class SplitterApplication @Bean SubscribableKafkaChannel words( KafkaTemplate kafkaTemplate, - KafkaListenerContainerFactory> containerFactory) + KafkaListenerContainerFactory> containerFactory, + ChannelInterceptor messageKeyInterceptor) { SubscribableKafkaChannel channel = new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words"); channel.setGroupId("words"); + List interceptors = new ArrayList<>(1); + interceptors.add(messageKeyInterceptor); + channel.setInterceptors(interceptors); return channel; } @@ -119,6 +131,22 @@ public class SplitterApplication return new DirectChannel(); } + @Bean + ChannelInterceptor messageKeyInterceptor() + { + return new ChannelInterceptor() { + @Override + public Message preSend(Message message, MessageChannel channel) + { + String key = message.getHeaders().get("key", String.class); + return MessageBuilder + .fromMessage(message) + .setHeader("kafka_messageKey", key) + .build(); + } + }; + } + @GlobalChannelInterceptor @Bean ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog) -- 2.20.1