WIP
authorKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 19:45:23 +0000 (21:45 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 19:45:23 +0000 (21:45 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
src/main/resources/integration.xml

index fd3becd..d73beff 100644 (file)
@@ -6,13 +6,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ImportResource;
 import org.springframework.expression.common.LiteralExpression;
-import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.integration.support.MessageBuilder;
-import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ConsumerProperties;
 import org.springframework.kafka.support.KafkaHeaders;
@@ -48,13 +45,10 @@ public class SplitterApplication
                };
        }
 
-       @InboundChannelAdapter(channel = "recordings")
        @Bean
-       KafkaMessageSource<String, String> source(
-                       ConsumerFactory<String, String> cf,
-                       SplitterApplicationProperties properties)
+       ConsumerProperties consumerProperties(SplitterApplicationProperties properties)
        {
-               return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+               return new ConsumerProperties(properties.getInputTopic());
        }
 
        @Bean
index 14bd157..e6ba5bf 100644 (file)
     </int:interceptors>
   </int:channel>
 
+  <int-kafka:inbound-channel-adapter
+      id="recordings"
+      consumer-factory="kafkaConsumerFactory"
+      payload-type="java.lang.String"
+      group-id="splitter"
+      channel="recordings" />
+
 </beans>