From: Kai Moritz <kai@juplo.de>
Date: Thu, 30 Jun 2022 19:45:23 +0000 (+0200)
Subject: WIP
X-Git-Tag: wip-input-channel~1
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=3918096e6f6c76f371b37555bc93596e2da4ca81;p=demos%2Fkafka%2Fwordcount

WIP
---

diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
index fd3becd..d73beff 100644
--- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
+++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
@@ -6,13 +6,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ImportResource;
 import org.springframework.expression.common.LiteralExpression;
-import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.integration.support.MessageBuilder;
-import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ConsumerProperties;
 import org.springframework.kafka.support.KafkaHeaders;
@@ -48,13 +45,10 @@ public class SplitterApplication
 		};
 	}
 
-	@InboundChannelAdapter(channel = "recordings")
 	@Bean
-	KafkaMessageSource<String, String> source(
-			ConsumerFactory<String, String> cf,
-			SplitterApplicationProperties properties)
+	ConsumerProperties consumerProperties(SplitterApplicationProperties properties)
 	{
-		return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+		return new ConsumerProperties(properties.getInputTopic());
 	}
 
 	@Bean
diff --git a/src/main/resources/integration.xml b/src/main/resources/integration.xml
index 14bd157..e6ba5bf 100644
--- a/src/main/resources/integration.xml
+++ b/src/main/resources/integration.xml
@@ -14,4 +14,11 @@
     </int:interceptors>
   </int:channel>
 
+  <int-kafka:inbound-channel-adapter
+      id="recordings"
+      consumer-factory="kafkaConsumerFactory"
+      payload-type="java.lang.String"
+      group-id="splitter"
+      channel="recordings" />
+
 </beans>