- @Bean
- MessageChannel words(ChannelInterceptor keyInterceptor)
- {
- DirectChannel words = new DirectChannel();
- words.addInterceptor(keyInterceptor);
- return words;
- }
-
- @Bean
- ChannelInterceptor keyInterceptor ()
- {
- return new ChannelInterceptor()
- {
- @Override
- public Message<?> preSend(Message<?> message, MessageChannel channel)
- {
- MessageHeaders headers = message.getHeaders();
- Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
- return
- MessageBuilder
- .fromMessage(message)
- .setHeader(KafkaHeaders.MESSAGE_KEY, key)
- .build();
- }
- };
- }
-