From 26fd715128501924feb9c056f785db33e2ecbb71 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Mon, 18 Jul 2022 21:20:10 +0200
Subject: [PATCH] splitter: 1.0.0-spring-ingetration - extract the key on the
 inbound channel

* The key of the Kafka-Message is extracted on the inbound channel with
  a `HeaderEnricher`.
* This way, the key is propagated to dreived messages automatically.
---
 .../splitter/SplitterApplication.java         | 25 +++++++++++++++++--
 1 file changed, 23 insertions(+), 2 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
index 3828050..8bd928b 100644
--- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
+++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
@@ -9,16 +9,23 @@ import org.springframework.expression.ExpressionParser;
 import org.springframework.expression.common.LiteralExpression;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
 import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.annotation.Transformer;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.core.MessageProducer;
 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.integration.transformer.HeaderEnricher;
+import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
+import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ContainerProperties;
 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
 import org.springframework.messaging.MessageHandler;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 @SpringBootApplication
 @EnableConfigurationProperties(SplitterApplicationProperties.class)
@@ -31,10 +38,24 @@ public class SplitterApplication
 	{
 		KafkaMessageDrivenChannelAdapter<String, String> adapter =
 				new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
-		adapter.setOutputChannelName("recordings");
+		adapter.setOutputChannelName("kafka-in");
 		return adapter;
 	}
 
+	@Bean
+	@Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
+	public HeaderEnricher kafkaInHeaderEnricher()
+	{
+		Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+		Expression expression =
+				new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+		headersToAdd.put(
+				"key",
+				new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+		HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+		return enricher;
+	}
+
 	@Bean
 	KafkaMessageListenerContainer<String, String> messageListenerContainer(
 			ConsumerFactory<String,String> consumerFactory,
@@ -56,7 +77,7 @@ public class SplitterApplication
 				new KafkaProducerMessageHandler<>(kafkaTemplate);
 		handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
 		final ExpressionParser parser = new SpelExpressionParser();
-		Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
+		Expression expression = parser.parseExpression("headers['key']");
 		handler.setMessageKeyExpression(expression);
 		return handler;
 	}
-- 
2.20.1