X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=412f429af31eea2dab8bd8b60916f9cddfa636c5;hb=9c9ffbe3316ed295533c576e823794aa6de99665;hp=d46a7cd4c4b3fd9e2b73f251d9e74f44f93772c7;hpb=0e63376a3cfc8ecabdc4699e9307f6a51415cb09;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index d46a7cd..412f429 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -4,20 +4,43 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; - -import java.time.Clock; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.kafka.inbound.KafkaMessageSource; +import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ConsumerProperties; +import org.springframework.messaging.MessageHandler; @SpringBootApplication @EnableConfigurationProperties(SplitterApplicationProperties.class) +@EnableIntegration public class SplitterApplication { + @InboundChannelAdapter(channel = "recordings") @Bean - Clock clock() + KafkaMessageSource source( + ConsumerFactory cf, + SplitterApplicationProperties properties) { - return Clock.systemDefaultZone(); + return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); } + @Bean + @ServiceActivator(inputChannel = "words") + MessageHandler handler( + KafkaTemplate kafkaTemplate, + SplitterApplicationProperties properties) + { + KafkaProducerMessageHandler handler = + new KafkaProducerMessageHandler<>(kafkaTemplate); + handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); + return handler; + } public static void main(String[] args) {