X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=eeeeb395695bf88246bf55f9fc357c1119e46cfc;hb=6ed97946e9e3d4e94a6ec0cc05dc257f0558bae5;hp=d46a7cd4c4b3fd9e2b73f251d9e74f44f93772c7;hpb=d5f54354b2b44d125493c830bf0475f7992ee395;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index d46a7cd..eeeeb39 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -1,23 +1,57 @@ package de.juplo.kafka.wordcount.splitter; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; - -import java.time.Clock; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; +import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; @SpringBootApplication @EnableConfigurationProperties(SplitterApplicationProperties.class) +@EnableIntegration public class SplitterApplication { @Bean - Clock clock() + SubscribableChannel recordings( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory, + SplitterApplicationProperties properties, + @Value("${spring.kafka.consumer.group-id}") String groupId) { - return Clock.systemDefaultZone(); + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic()); + channel.setGroupId(groupId); + return channel; } + @Bean + @ServiceActivator(inputChannel = "words") + MessageHandler handler( + KafkaTemplate kafkaTemplate, + SplitterApplicationProperties properties) + { + KafkaProducerMessageHandler handler = + new KafkaProducerMessageHandler<>(kafkaTemplate); + handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); + final ExpressionParser parser = new SpelExpressionParser(); + Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']"); + handler.setMessageKeyExpression(expression); + return handler; + } public static void main(String[] args) {