+ @Bean
+ MessageChannel words()
+ {
+ DirectChannel words = new DirectChannel();
+ words.addInterceptor(new ChannelInterceptor()
+ {
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel)
+ {
+ MessageHeaders headers = message.getHeaders();
+ Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
+ return
+ MessageBuilder
+ .fromMessage(message)
+ .setHeader(KafkaHeaders.MESSAGE_KEY, key)
+ .build();
+ }
+ });
+ return words;
+ }
+