From: Kai Moritz Date: Sun, 17 Jul 2022 15:18:40 +0000 (+0200) Subject: Merge branch 'splitter-spring-integration' into splitter-spring-integration-xml X-Git-Tag: wip-outbound-channel-adapter~10 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=feabe2aba3ac3f1b5e2bad4130198ca112e66776;p=demos%2Fkafka%2Fwordcount Merge branch 'splitter-spring-integration' into splitter-spring-integration-xml * The configuration can be simplified * The `ChannelInterceptor` can be omitted, if a `message-key-expression` is specivied for the outbound channel. * The explicit definition of the `DirectChannel` is not needed any more. --- feabe2aba3ac3f1b5e2bad4130198ca112e66776 diff --cc src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index fd3becd,246fffd..3d4b48f --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@@ -4,8 -4,10 +4,11 @@@ import org.springframework.boot.SpringA import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ImportResource; + import org.springframework.expression.Expression; + import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; + import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; @@@ -26,28 -22,8 +23,9 @@@ import org.springframework.messaging.Me @SpringBootApplication @EnableConfigurationProperties(SplitterApplicationProperties.class) @EnableIntegration +@ImportResource("classpath:integration.xml") public class SplitterApplication { - @Bean - ChannelInterceptor keyInterceptor () - { - return new ChannelInterceptor() - { - @Override - public Message preSend(Message message, MessageChannel channel) - { - MessageHeaders headers = message.getHeaders(); - Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY); - return - MessageBuilder - .fromMessage(message) - .setHeader(KafkaHeaders.MESSAGE_KEY, key) - .build(); - } - }; - } - @InboundChannelAdapter(channel = "recordings") @Bean KafkaMessageSource source( diff --cc src/main/resources/integration.xml index c632967,0000000..9872fbe mode 100644,000000..100644 --- a/src/main/resources/integration.xml +++ b/src/main/resources/integration.xml @@@ -1,15 -1,0 +1,7 @@@ + + - - - - - - ++ http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"> + +