From: Kai Moritz <kai@juplo.de>
Date: Wed, 20 Jul 2022 19:02:42 +0000 (+0200)
Subject: splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fsplitter-spring-integration;p=demos%2Fkafka%2Fwordcount

splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages

* Added a `ChannelInterceptor` to the `KafkaSubscribableChannel`, that
  adds the value of the `key`-Header as Header `kafka_messageKey`).
* This fixes the ordering, because now, all words, that are split of from
  a recording of a user are send with the same key.
---

diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
index 3dcb443..cfe7a4a 100644
--- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
+++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
@@ -28,11 +28,15 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer;
 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.MessageBuilder;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -79,11 +83,15 @@ public class SplitterApplication
 	@Bean
 	SubscribableKafkaChannel recordings(
 			KafkaTemplate<String, String> kafkaTemplate,
-			KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+			KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+			ChannelInterceptor messageKeyInterceptor)
 	{
 		SubscribableKafkaChannel channel =
 				new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
 		channel.setGroupId("recordings");
+		List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+		interceptors.add(messageKeyInterceptor);
+		channel.setInterceptors(interceptors);
 		return channel;
 	}
 
@@ -105,11 +113,15 @@ public class SplitterApplication
 	@Bean
 	SubscribableKafkaChannel words(
 			KafkaTemplate<String, String> kafkaTemplate,
-			KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+			KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+			ChannelInterceptor messageKeyInterceptor)
 	{
 		SubscribableKafkaChannel channel =
 				new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
 		channel.setGroupId("words");
+		List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+		interceptors.add(messageKeyInterceptor);
+		channel.setInterceptors(interceptors);
 		return channel;
 	}
 
@@ -119,6 +131,22 @@ public class SplitterApplication
 		return new DirectChannel();
 	}
 
+	@Bean
+	ChannelInterceptor messageKeyInterceptor()
+	{
+		return new ChannelInterceptor() {
+			@Override
+			public Message<?> preSend(Message<?> message, MessageChannel channel)
+			{
+				String key = message.getHeaders().get("key", String.class);
+				return MessageBuilder
+						.fromMessage(message)
+						.setHeader("kafka_messageKey", key)
+						.build();
+			}
+		};
+	}
+
 	@GlobalChannelInterceptor
 	@Bean
 	ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)