From: Kai Moritz Date: Mon, 18 Jul 2022 19:28:51 +0000 (+0200) Subject: WIP X-Git-Tag: wip-header-enricher~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1a1c7b33c40d7a4aa05bb47fcd2178217edff70f;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 750fbeb..a0d4f1c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -46,7 +46,7 @@ public class SplitterApplication @Bean @Transformer(inputChannel = "kafka-in", outputChannel = "recordings") - public HeaderEnricher headerEnricher() + public HeaderEnricher kafkaInHeaderEnricher() { Map> headersToAdd = new HashMap<>(); Expression expression = @@ -70,7 +70,21 @@ public class SplitterApplication } @Bean - @ServiceActivator(inputChannel = "words") + @Transformer(inputChannel = "words", outputChannel = "kafka-out") + public HeaderEnricher kafkaOutHeaderEnricher() + { + Map> headersToAdd = new HashMap<>(); + Expression expression = + new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']"); + headersToAdd.put( + "key", + new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); + HeaderEnricher enricher = new HeaderEnricher(headersToAdd); + return enricher; + } + + @Bean + @ServiceActivator(inputChannel = "kafka-out") MessageHandler handler( KafkaTemplate kafkaTemplate, SplitterApplicationProperties properties)