X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=d9ba135f48f949dc9c63d586d949609895c6e739;hb=8aaf63efd2d666aa43825ef99f81de394788fc20;hp=0a70551d18e60499d8009b53f2e762c3f171f388;hpb=7b017bc9954ecf26af755d43dd02999468cd94ea;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 0a70551..d9ba135 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -29,10 +29,17 @@ import org.springframework.messaging.support.ChannelInterceptor; public class SplitterApplication { @Bean - MessageChannel words() + MessageChannel words(ChannelInterceptor keyInterceptor) { DirectChannel words = new DirectChannel(); - words.addInterceptor(new ChannelInterceptor() + words.addInterceptor(keyInterceptor); + return words; + } + + @Bean + ChannelInterceptor keyInterceptor () + { + return new ChannelInterceptor() { @Override public Message preSend(Message message, MessageChannel channel) @@ -45,8 +52,7 @@ public class SplitterApplication .setHeader(KafkaHeaders.MESSAGE_KEY, key) .build(); } - }); - return words; + }; } @InboundChannelAdapter(channel = "recordings")