From: Kai Moritz Date: Sun, 17 Jul 2022 15:11:03 +0000 (+0200) Subject: splitter: 1.0.0-spring-ingetration - using a `message-key-expression` X-Git-Tag: wip-outbound-channel-adapter~10^2 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fwordcount;a=commitdiff_plain;h=9f82f66f1a9ed3194889fee1b69a10287c4b03db splitter: 1.0.0-spring-ingetration - using a `message-key-expression` * The configuration can be simplified * The `ChannelInterceptor` can be omitted, if a `message-key-expression` is specivied for the outbound channel. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index d9ba135..246fffd 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -4,23 +4,19 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; +import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.kafka.inbound.KafkaMessageSource; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; -import org.springframework.integration.support.MessageBuilder; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConsumerProperties; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.ChannelInterceptor; @SpringBootApplication @@ -28,33 +24,6 @@ import org.springframework.messaging.support.ChannelInterceptor; @EnableIntegration public class SplitterApplication { - @Bean - MessageChannel words(ChannelInterceptor keyInterceptor) - { - DirectChannel words = new DirectChannel(); - words.addInterceptor(keyInterceptor); - return words; - } - - @Bean - ChannelInterceptor keyInterceptor () - { - return new ChannelInterceptor() - { - @Override - public Message preSend(Message message, MessageChannel channel) - { - MessageHeaders headers = message.getHeaders(); - Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY); - return - MessageBuilder - .fromMessage(message) - .setHeader(KafkaHeaders.MESSAGE_KEY, key) - .build(); - } - }; - } - @InboundChannelAdapter(channel = "recordings") @Bean KafkaMessageSource source( @@ -73,6 +42,9 @@ public class SplitterApplication KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(kafkaTemplate); handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); + final ExpressionParser parser = new SpelExpressionParser(); + Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']"); + handler.setMessageKeyExpression(expression); return handler; }