From: Kai Moritz Date: Mon, 18 Jul 2022 19:29:25 +0000 (+0200) Subject: WIP X-Git-Tag: wip-header-enricher~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=64790d0d04ccf94b728d1fac0ccbd45a4bc14178;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index a0d4f1c..ea1fb17 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -70,21 +70,7 @@ public class SplitterApplication } @Bean - @Transformer(inputChannel = "words", outputChannel = "kafka-out") - public HeaderEnricher kafkaOutHeaderEnricher() - { - Map> headersToAdd = new HashMap<>(); - Expression expression = - new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']"); - headersToAdd.put( - "key", - new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); - HeaderEnricher enricher = new HeaderEnricher(headersToAdd); - return enricher; - } - - @Bean - @ServiceActivator(inputChannel = "kafka-out") + @ServiceActivator(inputChannel = "words") MessageHandler handler( KafkaTemplate kafkaTemplate, SplitterApplicationProperties properties) @@ -93,7 +79,7 @@ public class SplitterApplication new KafkaProducerMessageHandler<>(kafkaTemplate); handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); final ExpressionParser parser = new SpelExpressionParser(); - Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']"); + Expression expression = parser.parseExpression("headers['key']"); handler.setMessageKeyExpression(expression); return handler; }