* Added a global interceptor, that uses `WireTap` to copy the messages
to a special channel for logging, named `messageLog`.
* Added a `LoggingHandler`, that logs the messages, that are accumulated
in the new channel `messageLog`.
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.transformer.HeaderEnricher;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.transformer.HeaderEnricher;
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.support.ChannelInterceptor;
import java.util.HashMap;
import java.util.Map;
import java.util.HashMap;
import java.util.Map;
+ @Bean
+ MessageChannel messageLog()
+ {
+ return new DirectChannel();
+ }
+
+ @GlobalChannelInterceptor
+ @Bean
+ ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
+ {
+ return new WireTap(messageLog);
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "messageLog")
+ public LoggingHandler logging()
+ {
+ LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
+ adapter.setLoggerName("MESSAGE_LOG");
+ adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
+ return adapter;
+ }
+
+
public static void main(String[] args)
{
SpringApplication.run(SplitterApplication.class, args);
public static void main(String[] args)
{
SpringApplication.run(SplitterApplication.class, args);
server.port=8086
management.endpoints.web.exposure.include=*
server.port=8086
management.endpoints.web.exposure.include=*
+logging.level.root=DEBUG
logging.level.de.juplo.kafka.wordcount.splitter=DEBUG
spring.kafka.consumer.auto-offset-reset. earliest
spring.kafka.consumer.group-id=splitter
logging.level.de.juplo.kafka.wordcount.splitter=DEBUG
spring.kafka.consumer.auto-offset-reset. earliest
spring.kafka.consumer.group-id=splitter