splitter: 1.0.0-spring-ingetration - added an interceptor for logging
authorKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 20:48:26 +0000 (22:48 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 20 Jul 2022 18:36:53 +0000 (20:36 +0200)
* Added a global interceptor, that uses `WireTap` to copy the messages
  to a special channel for logging, named `messageLog`.
* Added a `LoggingHandler`, that logs the messages, that are accumulated
  in the new channel `messageLog`.

src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
src/main/resources/application.properties

index 7087ed1..3dcb443 100644 (file)
@@ -10,8 +10,12 @@ import org.springframework.expression.common.LiteralExpression;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.annotation.Transformer;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.interceptor.WireTap;
 import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.config.GlobalChannelInterceptor;
 import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.handler.LoggingHandler;
 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.integration.transformer.HeaderEnricher;
@@ -24,7 +28,9 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer;
 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.support.ChannelInterceptor;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -107,6 +113,30 @@ public class SplitterApplication
                return channel;
        }
 
+       @Bean
+       MessageChannel messageLog()
+       {
+               return new DirectChannel();
+       }
+
+       @GlobalChannelInterceptor
+       @Bean
+       ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
+       {
+               return new WireTap(messageLog);
+       }
+
+       @Bean
+       @ServiceActivator(inputChannel = "messageLog")
+       public LoggingHandler logging()
+       {
+               LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
+               adapter.setLoggerName("MESSAGE_LOG");
+               adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
+               return adapter;
+       }
+
+
        public static void main(String[] args)
        {
                SpringApplication.run(SplitterApplication.class, args);
index 94e7492..f1be16b 100644 (file)
@@ -1,5 +1,6 @@
 server.port=8086
 management.endpoints.web.exposure.include=*
+logging.level.root=DEBUG
 logging.level.de.juplo.kafka.wordcount.splitter=DEBUG
 spring.kafka.consumer.auto-offset-reset. earliest
 spring.kafka.consumer.group-id=splitter