+ @Bean
+ ChannelInterceptor keyInterceptor ()
+ {
+ return new ChannelInterceptor()
+ {
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel)
+ {
+ MessageHeaders headers = message.getHeaders();
+ Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
+ return
+ MessageBuilder
+ .fromMessage(message)
+ .setHeader(KafkaHeaders.MESSAGE_KEY, key)
+ .build();
+ }
+ };
+ }
+