From 1a1c7b33c40d7a4aa05bb47fcd2178217edff70f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 18 Jul 2022 21:28:51 +0200 Subject: [PATCH] WIP --- .../splitter/SplitterApplication.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 750fbeb..a0d4f1c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -46,7 +46,7 @@ public class SplitterApplication @Bean @Transformer(inputChannel = "kafka-in", outputChannel = "recordings") - public HeaderEnricher headerEnricher() + public HeaderEnricher kafkaInHeaderEnricher() { Map> headersToAdd = new HashMap<>(); Expression expression = @@ -70,7 +70,21 @@ public class SplitterApplication } @Bean - @ServiceActivator(inputChannel = "words") + @Transformer(inputChannel = "words", outputChannel = "kafka-out") + public HeaderEnricher kafkaOutHeaderEnricher() + { + Map> headersToAdd = new HashMap<>(); + Expression expression = + new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']"); + headersToAdd.put( + "key", + new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); + HeaderEnricher enricher = new HeaderEnricher(headersToAdd); + return enricher; + } + + @Bean + @ServiceActivator(inputChannel = "kafka-out") MessageHandler handler( KafkaTemplate kafkaTemplate, SplitterApplicationProperties properties) -- 2.20.1