splitter: 1.0.0-spring-ingetration - added an interceptor for logging
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
index 7087ed1..3dcb443 100644 (file)
@@ -10,8 +10,12 @@ import org.springframework.expression.common.LiteralExpression;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.annotation.Transformer;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.interceptor.WireTap;
 import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.config.GlobalChannelInterceptor;
 import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.handler.LoggingHandler;
 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.integration.transformer.HeaderEnricher;
@@ -24,7 +28,9 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer;
 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.support.ChannelInterceptor;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -107,6 +113,30 @@ public class SplitterApplication
                return channel;
        }
 
+       @Bean
+       MessageChannel messageLog()
+       {
+               return new DirectChannel();
+       }
+
+       @GlobalChannelInterceptor
+       @Bean
+       ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
+       {
+               return new WireTap(messageLog);
+       }
+
+       @Bean
+       @ServiceActivator(inputChannel = "messageLog")
+       public LoggingHandler logging()
+       {
+               LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
+               adapter.setLoggerName("MESSAGE_LOG");
+               adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
+               return adapter;
+       }
+
+
        public static void main(String[] args)
        {
                SpringApplication.run(SplitterApplication.class, args);