X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=7087ed1bbed2cb835868c7fc90116455004d7dd5;hb=8d0ef2423c0f6ef6a439b98a1d9b1b1a64d89d45;hp=8bd928b555b743e4af8be9c20db42143fd746083;hpb=26fd715128501924feb9c056f785db33e2ecbb71;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 8bd928b..7087ed1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -21,6 +21,9 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.messaging.MessageHandler; import java.util.HashMap; @@ -67,6 +70,17 @@ public class SplitterApplication new ContainerProperties(properties.getInputTopic())); } + @Bean + SubscribableKafkaChannel recordings( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory) + { + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings"); + channel.setGroupId("recordings"); + return channel; + } + @Bean @ServiceActivator(inputChannel = "words") MessageHandler handler( @@ -82,6 +96,17 @@ public class SplitterApplication return handler; } + @Bean + SubscribableKafkaChannel words( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory) + { + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words"); + channel.setGroupId("words"); + return channel; + } + public static void main(String[] args) { SpringApplication.run(SplitterApplication.class, args);