From 8d7bc7de061041fed3a1f63adaa2b5257bf3f91d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 30 Jun 2022 21:45:23 +0200 Subject: [PATCH] WIP --- .../splitter/SplitterApplication.java | 23 ------------------- src/main/resources/integration.xml | 7 ++++++ 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 3d4b48f..22576a0 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -5,19 +5,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ImportResource; -import org.springframework.expression.Expression; -import org.springframework.expression.ExpressionParser; -import org.springframework.expression.common.LiteralExpression; -import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.InboundChannelAdapter; -import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.kafka.inbound.KafkaMessageSource; -import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConsumerProperties; -import org.springframework.messaging.MessageHandler; @SpringBootApplication @@ -35,21 +27,6 @@ public class SplitterApplication return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); } - @Bean - @ServiceActivator(inputChannel = "words") - MessageHandler handler( - KafkaTemplate kafkaTemplate, - SplitterApplicationProperties properties) - { - KafkaProducerMessageHandler handler = - new KafkaProducerMessageHandler<>(kafkaTemplate); - handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); - final ExpressionParser parser = new SpelExpressionParser(); - Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']"); - handler.setMessageKeyExpression(expression); - return handler; - } - public static void main(String[] args) { SpringApplication.run(SplitterApplication.class, args); diff --git a/src/main/resources/integration.xml b/src/main/resources/integration.xml index 2495877..4ecea8b 100644 --- a/src/main/resources/integration.xml +++ b/src/main/resources/integration.xml @@ -8,4 +8,11 @@ http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd"> + + -- 2.20.1