From 57b7d26c4721fda7770abf944bc811667679dfe9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 18 Jul 2022 18:09:41 +0200 Subject: [PATCH] WIP --- .../splitter/SplitterApplication.java | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 3fec844..555c216 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -9,13 +9,17 @@ import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; +import org.springframework.integration.kafka.inbound.KafkaMessageSource; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.messaging.MessageHandler; @@ -27,16 +31,23 @@ public class SplitterApplication @Bean SubscribableKafkaChannel recordings( KafkaTemplate kafkaTemplate, - KafkaListenerContainerFactory> containerFactory, - SplitterApplicationProperties properties, - @Value("${spring.kafka.consumer.group-id}") String groupId) + KafkaListenerContainerFactory> containerFactory) { SubscribableKafkaChannel channel = - new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic()); - channel.setGroupId(groupId); + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings"); + channel.setGroupId("recordings"); return channel; } + @InboundChannelAdapter(channel = "recordings") + @Bean + KafkaMessageSource source( + ConsumerFactory cf, + SplitterApplicationProperties properties) + { + return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); + } + @Bean @ServiceActivator(inputChannel = "words") MessageHandler handler( @@ -52,6 +63,17 @@ public class SplitterApplication return handler; } + @Bean + SubscribableKafkaChannel words( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory) + { + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words"); + channel.setGroupId("words"); + return channel; + } + public static void main(String[] args) { SpringApplication.run(SplitterApplication.class, args); -- 2.20.1