From: Kai Moritz Date: Sun, 17 Jul 2022 17:03:18 +0000 (+0200) Subject: splitter: 1.0.0-spring-ingetration - using a channel backed by Kafka X-Git-Tag: wip-kafka-backed-channels~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ec42408f67d10a033fbb2320acfce091cfb8bf73;p=demos%2Fkafka%2Fwordcount splitter: 1.0.0-spring-ingetration - using a channel backed by Kafka * Instead of the `@InboundChannelAdapter`, a `SubscribableKafkaChanel` could be used. * But in the given setup, this is kind of a dirty hack, because with this konfiguration the intentionally read-only input-channel becomes writable in Spring Integration. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 246fffd..3fec844 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.splitter; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -8,14 +9,13 @@ import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.kafka.inbound.KafkaMessageSource; +import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; -import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.ConsumerProperties; +import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.messaging.MessageHandler; @@ -24,13 +24,17 @@ import org.springframework.messaging.MessageHandler; @EnableIntegration public class SplitterApplication { - @InboundChannelAdapter(channel = "recordings") @Bean - KafkaMessageSource source( - ConsumerFactory cf, - SplitterApplicationProperties properties) + SubscribableKafkaChannel recordings( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory, + SplitterApplicationProperties properties, + @Value("${spring.kafka.consumer.group-id}") String groupId) { - return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic()); + channel.setGroupId(groupId); + return channel; } @Bean