WIP
authorKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 19:45:23 +0000 (21:45 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 17 Jul 2022 15:24:52 +0000 (17:24 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
src/main/resources/integration.xml

index 3d4b48f..22576a0 100644 (file)
@@ -5,19 +5,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ImportResource;
-import org.springframework.expression.Expression;
-import org.springframework.expression.ExpressionParser;
-import org.springframework.expression.common.LiteralExpression;
-import org.springframework.expression.spel.standard.SpelExpressionParser;
 import org.springframework.integration.annotation.InboundChannelAdapter;
-import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.kafka.inbound.KafkaMessageSource;
-import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ConsumerProperties;
-import org.springframework.messaging.MessageHandler;
 
 
 @SpringBootApplication
@@ -35,21 +27,6 @@ public class SplitterApplication
                return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
        }
 
-       @Bean
-       @ServiceActivator(inputChannel = "words")
-       MessageHandler handler(
-                       KafkaTemplate<String, String> kafkaTemplate,
-                       SplitterApplicationProperties properties)
-       {
-               KafkaProducerMessageHandler<String, String> handler =
-                               new KafkaProducerMessageHandler<>(kafkaTemplate);
-               handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
-               final ExpressionParser parser = new SpelExpressionParser();
-               Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
-               handler.setMessageKeyExpression(expression);
-               return handler;
-       }
-
        public static void main(String[] args)
        {
                SpringApplication.run(SplitterApplication.class, args);
index 2495877..4ecea8b 100644 (file)
@@ -8,4 +8,11 @@
                http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
         http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
 
+  <int-kafka:inbound-channel-adapter
+      id="recordings"
+      consumer-factory="kafkaConsumerFactory"
+      payload-type="java.lang.String"
+      group-id="splitter"
+      channel="recordings" />
+
 </beans>