X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=8bd928b555b743e4af8be9c20db42143fd746083;hb=26fd715128501924feb9c056f785db33e2ecbb71;hp=3828050e00aa90529924cfc0f65d49568e2da254;hpb=5c1e4e203c3c0d7c07b4ce1293a4a120faf1db3c;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 3828050..8bd928b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -9,16 +9,23 @@ import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.annotation.Transformer; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; +import org.springframework.integration.transformer.HeaderEnricher; +import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor; +import org.springframework.integration.transformer.support.HeaderValueMessageProcessor; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.messaging.MessageHandler; +import java.util.HashMap; +import java.util.Map; + @SpringBootApplication @EnableConfigurationProperties(SplitterApplicationProperties.class) @@ -31,10 +38,24 @@ public class SplitterApplication { KafkaMessageDrivenChannelAdapter adapter = new KafkaMessageDrivenChannelAdapter(messageListenerContainer); - adapter.setOutputChannelName("recordings"); + adapter.setOutputChannelName("kafka-in"); return adapter; } + @Bean + @Transformer(inputChannel = "kafka-in", outputChannel = "recordings") + public HeaderEnricher kafkaInHeaderEnricher() + { + Map> headersToAdd = new HashMap<>(); + Expression expression = + new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']"); + headersToAdd.put( + "key", + new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); + HeaderEnricher enricher = new HeaderEnricher(headersToAdd); + return enricher; + } + @Bean KafkaMessageListenerContainer messageListenerContainer( ConsumerFactory consumerFactory, @@ -56,7 +77,7 @@ public class SplitterApplication new KafkaProducerMessageHandler<>(kafkaTemplate); handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); final ExpressionParser parser = new SpelExpressionParser(); - Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']"); + Expression expression = parser.parseExpression("headers['key']"); handler.setMessageKeyExpression(expression); return handler; }