X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=cfe7a4ade185239abc74c6f3d1c9efbfbdc08a29;hb=refs%2Fheads%2Fsplitter-spring-integration;hp=3dcb443b46370c03c2d9d81927e70c3f9f04b027;hpb=c16e7996d992ad58b93c4fc344d81a890dc21489;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 3dcb443..cfe7a4a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -28,11 +28,15 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.messaging.support.MessageBuilder; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -79,11 +83,15 @@ public class SplitterApplication @Bean SubscribableKafkaChannel recordings( KafkaTemplate kafkaTemplate, - KafkaListenerContainerFactory> containerFactory) + KafkaListenerContainerFactory> containerFactory, + ChannelInterceptor messageKeyInterceptor) { SubscribableKafkaChannel channel = new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings"); channel.setGroupId("recordings"); + List interceptors = new ArrayList<>(1); + interceptors.add(messageKeyInterceptor); + channel.setInterceptors(interceptors); return channel; } @@ -105,11 +113,15 @@ public class SplitterApplication @Bean SubscribableKafkaChannel words( KafkaTemplate kafkaTemplate, - KafkaListenerContainerFactory> containerFactory) + KafkaListenerContainerFactory> containerFactory, + ChannelInterceptor messageKeyInterceptor) { SubscribableKafkaChannel channel = new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words"); channel.setGroupId("words"); + List interceptors = new ArrayList<>(1); + interceptors.add(messageKeyInterceptor); + channel.setInterceptors(interceptors); return channel; } @@ -119,6 +131,22 @@ public class SplitterApplication return new DirectChannel(); } + @Bean + ChannelInterceptor messageKeyInterceptor() + { + return new ChannelInterceptor() { + @Override + public Message preSend(Message message, MessageChannel channel) + { + String key = message.getHeaders().get("key", String.class); + return MessageBuilder + .fromMessage(message) + .setHeader("kafka_messageKey", key) + .build(); + } + }; + } + @GlobalChannelInterceptor @Bean ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)