splitter: 1.0.0-spring-integration-xml - defined the `DirectChannel` in XML
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
index 412f429..fd3becd 100644 (file)
@@ -4,23 +4,50 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ImportResource;
 import org.springframework.expression.common.LiteralExpression;
 import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.kafka.inbound.KafkaMessageSource;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.integration.support.MessageBuilder;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.ChannelInterceptor;
 
 
 @SpringBootApplication
 @EnableConfigurationProperties(SplitterApplicationProperties.class)
 @EnableIntegration
+@ImportResource("classpath:integration.xml")
 public class SplitterApplication
 {
+       @Bean
+       ChannelInterceptor keyInterceptor ()
+       {
+               return new ChannelInterceptor()
+               {
+                       @Override
+                       public Message<?> preSend(Message<?> message, MessageChannel channel)
+                       {
+                               MessageHeaders headers = message.getHeaders();
+                               Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
+                               return
+                                               MessageBuilder
+                                                               .fromMessage(message)
+                                                               .setHeader(KafkaHeaders.MESSAGE_KEY, key)
+                                                               .build();
+                       }
+               };
+       }
+
        @InboundChannelAdapter(channel = "recordings")
        @Bean
        KafkaMessageSource<String, String> source(