From: Kai Moritz Date: Mon, 18 Jul 2022 20:48:26 +0000 (+0200) Subject: splitter: 1.0.0-spring-ingetration - added an interceptor for logging X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f81962e03ef5ada8426fb069073ae38e877dd4ac;p=demos%2Fkafka%2Fwordcount splitter: 1.0.0-spring-ingetration - added an interceptor for logging * Added a global interceptor, that uses `WireTap` to copy the messages to a special channel for logging, named `messageLog`. * Added a `LoggingHandler`, that logs the messages, that are accumulated in the new channel `messageLog`. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 7087ed1..3dcb443 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -10,8 +10,12 @@ import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.interceptor.WireTap; import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.config.GlobalChannelInterceptor; import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.integration.transformer.HeaderEnricher; @@ -24,7 +28,9 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.ChannelInterceptor; import java.util.HashMap; import java.util.Map; @@ -107,6 +113,30 @@ public class SplitterApplication return channel; } + @Bean + MessageChannel messageLog() + { + return new DirectChannel(); + } + + @GlobalChannelInterceptor + @Bean + ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog) + { + return new WireTap(messageLog); + } + + @Bean + @ServiceActivator(inputChannel = "messageLog") + public LoggingHandler logging() + { + LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG); + adapter.setLoggerName("MESSAGE_LOG"); + adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers"); + return adapter; + } + + public static void main(String[] args) { SpringApplication.run(SplitterApplication.class, args); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 94e7492..f1be16b 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,5 +1,6 @@ server.port=8086 management.endpoints.web.exposure.include=* +logging.level.root=DEBUG logging.level.de.juplo.kafka.wordcount.splitter=DEBUG spring.kafka.consumer.auto-offset-reset. earliest spring.kafka.consumer.group-id=splitter