X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=8bd928b555b743e4af8be9c20db42143fd746083;hb=26fd715128501924feb9c056f785db33e2ecbb71;hp=0a70551d18e60499d8009b53f2e762c3f171f388;hpb=7b017bc9954ecf26af755d43dd02999468cd94ea;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 0a70551..8bd928b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -4,23 +4,27 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; -import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.annotation.Transformer; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.kafka.inbound.KafkaMessageSource; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; -import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.transformer.HeaderEnricher; +import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor; +import org.springframework.integration.transformer.support.HeaderValueMessageProcessor; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.ConsumerProperties; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.ChannelInterceptor; + +import java.util.HashMap; +import java.util.Map; @SpringBootApplication @@ -29,33 +33,38 @@ import org.springframework.messaging.support.ChannelInterceptor; public class SplitterApplication { @Bean - MessageChannel words() + MessageProducer messageProducer( + KafkaMessageListenerContainer messageListenerContainer) + { + KafkaMessageDrivenChannelAdapter adapter = + new KafkaMessageDrivenChannelAdapter(messageListenerContainer); + adapter.setOutputChannelName("kafka-in"); + return adapter; + } + + @Bean + @Transformer(inputChannel = "kafka-in", outputChannel = "recordings") + public HeaderEnricher kafkaInHeaderEnricher() { - DirectChannel words = new DirectChannel(); - words.addInterceptor(new ChannelInterceptor() - { - @Override - public Message preSend(Message message, MessageChannel channel) - { - MessageHeaders headers = message.getHeaders(); - Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY); - return - MessageBuilder - .fromMessage(message) - .setHeader(KafkaHeaders.MESSAGE_KEY, key) - .build(); - } - }); - return words; + Map> headersToAdd = new HashMap<>(); + Expression expression = + new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']"); + headersToAdd.put( + "key", + new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); + HeaderEnricher enricher = new HeaderEnricher(headersToAdd); + return enricher; } - @InboundChannelAdapter(channel = "recordings") @Bean - KafkaMessageSource source( - ConsumerFactory cf, + KafkaMessageListenerContainer messageListenerContainer( + ConsumerFactory consumerFactory, SplitterApplicationProperties properties) { - return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); + return + new KafkaMessageListenerContainer<>( + consumerFactory, + new ContainerProperties(properties.getInputTopic())); } @Bean @@ -67,6 +76,9 @@ public class SplitterApplication KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(kafkaTemplate); handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); + final ExpressionParser parser = new SpelExpressionParser(); + Expression expression = parser.parseExpression("headers['key']"); + handler.setMessageKeyExpression(expression); return handler; }