From 64790d0d04ccf94b728d1fac0ccbd45a4bc14178 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 18 Jul 2022 21:29:25 +0200 Subject: [PATCH] WIP --- .../splitter/SplitterApplication.java | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index a0d4f1c..ea1fb17 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -70,21 +70,7 @@ public class SplitterApplication } @Bean - @Transformer(inputChannel = "words", outputChannel = "kafka-out") - public HeaderEnricher kafkaOutHeaderEnricher() - { - Map> headersToAdd = new HashMap<>(); - Expression expression = - new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']"); - headersToAdd.put( - "key", - new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); - HeaderEnricher enricher = new HeaderEnricher(headersToAdd); - return enricher; - } - - @Bean - @ServiceActivator(inputChannel = "kafka-out") + @ServiceActivator(inputChannel = "words") MessageHandler handler( KafkaTemplate kafkaTemplate, SplitterApplicationProperties properties) @@ -93,7 +79,7 @@ public class SplitterApplication new KafkaProducerMessageHandler<>(kafkaTemplate); handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); final ExpressionParser parser = new SpelExpressionParser(); - Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']"); + Expression expression = parser.parseExpression("headers['key']"); handler.setMessageKeyExpression(expression); return handler; } -- 2.20.1