WIP
authorKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 19:29:25 +0000 (21:29 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 19:29:25 +0000 (21:29 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index a0d4f1c..ea1fb17 100644 (file)
@@ -70,21 +70,7 @@ public class SplitterApplication
        }
 
        @Bean
-       @Transformer(inputChannel = "words", outputChannel = "kafka-out")
-       public HeaderEnricher kafkaOutHeaderEnricher()
-       {
-               Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
-               Expression expression =
-                               new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
-               headersToAdd.put(
-                               "key",
-                               new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
-               HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
-               return enricher;
-       }
-
-       @Bean
-       @ServiceActivator(inputChannel = "kafka-out")
+       @ServiceActivator(inputChannel = "words")
        MessageHandler handler(
                        KafkaTemplate<String, String> kafkaTemplate,
                        SplitterApplicationProperties properties)
@@ -93,7 +79,7 @@ public class SplitterApplication
                                new KafkaProducerMessageHandler<>(kafkaTemplate);
                handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
                final ExpressionParser parser = new SpelExpressionParser();
-               Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
+               Expression expression = parser.parseExpression("headers['key']");
                handler.setMessageKeyExpression(expression);
                return handler;
        }