WIP
authorKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 19:20:10 +0000 (21:20 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 19:20:10 +0000 (21:20 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index b5948c3..e3b58f2 100644 (file)
@@ -9,16 +9,25 @@ import org.springframework.expression.ExpressionParser;
 import org.springframework.expression.common.LiteralExpression;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
 import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.annotation.Transformer;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.core.MessageProducer;
 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.integration.transformer.HeaderEnricher;
+import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
+import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
+import org.springframework.integration.transformer.support.StaticHeaderValueMessageProcessor;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ContainerProperties;
 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
 import org.springframework.messaging.MessageHandler;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 
 @SpringBootApplication
 @EnableConfigurationProperties(SplitterApplicationProperties.class)
@@ -31,10 +40,24 @@ public class SplitterApplication
        {
                KafkaMessageDrivenChannelAdapter<String, String> adapter =
                                new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
-               adapter.setOutputChannelName("recordings");
+               adapter.setOutputChannelName("kafka-in");
                return adapter;
        }
 
+       @Bean
+       @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
+       public HeaderEnricher headerEnricher()
+       {
+               Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+               Expression expression =
+                               new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+               headersToAdd.put(
+                               "key",
+                               new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+               HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+               return enricher;
+       }
+
        @Bean
        KafkaMessageListenerContainer<String, String> messageListenerContainer(
                        ConsumerFactory<String,String> consumerFactory,