+ @Bean
+ SubscribableKafkaChannel words(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ ChannelInterceptor messageKeyInterceptor)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+ channel.setGroupId("words");
+ List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+ interceptors.add(messageKeyInterceptor);
+ channel.setInterceptors(interceptors);
+ return channel;
+ }
+
+ @Bean
+ MessageChannel messageLog()
+ {
+ return new DirectChannel();
+ }
+
+ @Bean
+ ChannelInterceptor messageKeyInterceptor()
+ {
+ return new ChannelInterceptor() {
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel)
+ {
+ String key = message.getHeaders().get("key", String.class);
+ return MessageBuilder
+ .fromMessage(message)
+ .setHeader("kafka_messageKey", key)
+ .build();
+ }
+ };
+ }
+
+ @GlobalChannelInterceptor
+ @Bean
+ ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
+ {
+ return new WireTap(messageLog);
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "messageLog")
+ public LoggingHandler logging()
+ {
+ LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
+ adapter.setLoggerName("MESSAGE_LOG");
+ adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
+ return adapter;
+ }
+
+