WIP
authorKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 19:28:51 +0000 (21:28 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 19:28:51 +0000 (21:28 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index 750fbeb..a0d4f1c 100644 (file)
@@ -46,7 +46,7 @@ public class SplitterApplication
 
        @Bean
        @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
-       public HeaderEnricher headerEnricher()
+       public HeaderEnricher kafkaInHeaderEnricher()
        {
                Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
                Expression expression =
@@ -70,7 +70,21 @@ public class SplitterApplication
        }
 
        @Bean
-       @ServiceActivator(inputChannel = "words")
+       @Transformer(inputChannel = "words", outputChannel = "kafka-out")
+       public HeaderEnricher kafkaOutHeaderEnricher()
+       {
+               Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+               Expression expression =
+                               new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+               headersToAdd.put(
+                               "key",
+                               new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+               HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+               return enricher;
+       }
+
+       @Bean
+       @ServiceActivator(inputChannel = "kafka-out")
        MessageHandler handler(
                        KafkaTemplate<String, String> kafkaTemplate,
                        SplitterApplicationProperties properties)