From 8273e88013d9297b3830b50a3a6ce148a51edf76 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 30 Jun 2022 21:26:25 +0200 Subject: [PATCH] WIP --- .../wordcount/splitter/SplitterApplication.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 5d399f4..e73d2bb 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -6,12 +6,16 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ImportResource; import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.kafka.inbound.KafkaMessageSource; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.integration.support.MessageBuilder; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -53,6 +57,15 @@ public class SplitterApplication }; } + @InboundChannelAdapter(channel = "recordings") + @Bean + KafkaMessageSource source( + ConsumerFactory cf, + SplitterApplicationProperties properties) + { + return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); + } + @Bean @ServiceActivator(inputChannel = "words") MessageHandler handler( -- 2.20.1