WIP
authorKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 19:26:25 +0000 (21:26 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 19:34:51 +0000 (21:34 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index 5d399f4..e73d2bb 100644 (file)
@@ -6,12 +6,16 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ImportResource;
 import org.springframework.expression.common.LiteralExpression;
+import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.kafka.inbound.KafkaMessageSource;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.integration.support.MessageBuilder;
+import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.listener.ConsumerProperties;
 import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageChannel;
@@ -53,6 +57,15 @@ public class SplitterApplication
                };
        }
 
+       @InboundChannelAdapter(channel = "recordings")
+       @Bean
+       KafkaMessageSource<String, String> source(
+                       ConsumerFactory<String, String> cf,
+                       SplitterApplicationProperties properties)
+       {
+               return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+       }
+
        @Bean
        @ServiceActivator(inputChannel = "words")
        MessageHandler handler(