splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
index 7087ed1..cfe7a4a 100644 (file)
@@ -10,8 +10,12 @@ import org.springframework.expression.common.LiteralExpression;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.annotation.Transformer;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.interceptor.WireTap;
 import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.config.GlobalChannelInterceptor;
 import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.handler.LoggingHandler;
 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.integration.transformer.HeaderEnricher;
@@ -24,9 +28,15 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer;
 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.MessageBuilder;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -73,11 +83,15 @@ public class SplitterApplication
        @Bean
        SubscribableKafkaChannel recordings(
                        KafkaTemplate<String, String> kafkaTemplate,
-                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+                       ChannelInterceptor messageKeyInterceptor)
        {
                SubscribableKafkaChannel channel =
                                new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
                channel.setGroupId("recordings");
+               List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+               interceptors.add(messageKeyInterceptor);
+               channel.setInterceptors(interceptors);
                return channel;
        }
 
@@ -99,14 +113,58 @@ public class SplitterApplication
        @Bean
        SubscribableKafkaChannel words(
                        KafkaTemplate<String, String> kafkaTemplate,
-                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+                       ChannelInterceptor messageKeyInterceptor)
        {
                SubscribableKafkaChannel channel =
                                new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
                channel.setGroupId("words");
+               List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+               interceptors.add(messageKeyInterceptor);
+               channel.setInterceptors(interceptors);
                return channel;
        }
 
+       @Bean
+       MessageChannel messageLog()
+       {
+               return new DirectChannel();
+       }
+
+       @Bean
+       ChannelInterceptor messageKeyInterceptor()
+       {
+               return new ChannelInterceptor() {
+                       @Override
+                       public Message<?> preSend(Message<?> message, MessageChannel channel)
+                       {
+                               String key = message.getHeaders().get("key", String.class);
+                               return MessageBuilder
+                                               .fromMessage(message)
+                                               .setHeader("kafka_messageKey", key)
+                                               .build();
+                       }
+               };
+       }
+
+       @GlobalChannelInterceptor
+       @Bean
+       ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
+       {
+               return new WireTap(messageLog);
+       }
+
+       @Bean
+       @ServiceActivator(inputChannel = "messageLog")
+       public LoggingHandler logging()
+       {
+               LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
+               adapter.setLoggerName("MESSAGE_LOG");
+               adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
+               return adapter;
+       }
+
+
        public static void main(String[] args)
        {
                SpringApplication.run(SplitterApplication.class, args);