From: Kai Moritz Date: Wed, 20 Jul 2022 19:02:42 +0000 (+0200) Subject: splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fwordcount;a=commitdiff_plain;h=refs%2Fheads%2Fsplitter-spring-integration splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages * Added a `ChannelInterceptor` to the `KafkaSubscribableChannel`, that adds the value of the `key`-Header as Header `kafka_messageKey`). * This fixes the ordering, because now, all words, that are split of from a recording of a user are send with the same key. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 3dcb443..cfe7a4a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -28,11 +28,15 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.messaging.support.MessageBuilder; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -79,11 +83,15 @@ public class SplitterApplication @Bean SubscribableKafkaChannel recordings( KafkaTemplate kafkaTemplate, - KafkaListenerContainerFactory> containerFactory) + KafkaListenerContainerFactory> containerFactory, + ChannelInterceptor messageKeyInterceptor) { SubscribableKafkaChannel channel = new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings"); channel.setGroupId("recordings"); + List interceptors = new ArrayList<>(1); + interceptors.add(messageKeyInterceptor); + channel.setInterceptors(interceptors); return channel; } @@ -105,11 +113,15 @@ public class SplitterApplication @Bean SubscribableKafkaChannel words( KafkaTemplate kafkaTemplate, - KafkaListenerContainerFactory> containerFactory) + KafkaListenerContainerFactory> containerFactory, + ChannelInterceptor messageKeyInterceptor) { SubscribableKafkaChannel channel = new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words"); channel.setGroupId("words"); + List interceptors = new ArrayList<>(1); + interceptors.add(messageKeyInterceptor); + channel.setInterceptors(interceptors); return channel; } @@ -119,6 +131,22 @@ public class SplitterApplication return new DirectChannel(); } + @Bean + ChannelInterceptor messageKeyInterceptor() + { + return new ChannelInterceptor() { + @Override + public Message preSend(Message message, MessageChannel channel) + { + String key = message.getHeaders().get("key", String.class); + return MessageBuilder + .fromMessage(message) + .setHeader("kafka_messageKey", key) + .build(); + } + }; + } + @GlobalChannelInterceptor @Bean ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)