import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.MessageBuilder;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
@Bean
SubscribableKafkaChannel recordings(
KafkaTemplate<String, String> kafkaTemplate,
- KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ ChannelInterceptor messageKeyInterceptor)
{
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
channel.setGroupId("recordings");
+ List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+ interceptors.add(messageKeyInterceptor);
+ channel.setInterceptors(interceptors);
return channel;
}
@Bean
SubscribableKafkaChannel words(
KafkaTemplate<String, String> kafkaTemplate,
- KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ ChannelInterceptor messageKeyInterceptor)
{
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
channel.setGroupId("words");
+ List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+ interceptors.add(messageKeyInterceptor);
+ channel.setInterceptors(interceptors);
return channel;
}
return new DirectChannel();
}
+ @Bean
+ ChannelInterceptor messageKeyInterceptor()
+ {
+ return new ChannelInterceptor() {
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel)
+ {
+ String key = message.getHeaders().get("key", String.class);
+ return MessageBuilder
+ .fromMessage(message)
+ .setHeader("kafka_messageKey", key)
+ .build();
+ }
+ };
+ }
+
@GlobalChannelInterceptor
@Bean
ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)