+ @Bean
+ ChannelInterceptor messageKeyInterceptor()
+ {
+ return new ChannelInterceptor() {
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel)
+ {
+ String key = message.getHeaders().get("key", String.class);
+ return MessageBuilder
+ .fromMessage(message)
+ .setHeader("kafka_messageKey", key)
+ .build();
+ }
+ };
+ }
+