splitter: 1.0.0-spring-ingetration - using a channel backed by Kafka spring-integration--dirty-hack-SubscribableKafkaChanel-as-input
authorKai Moritz <kai@juplo.de>
Sun, 17 Jul 2022 17:03:18 +0000 (19:03 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 16:37:42 +0000 (18:37 +0200)
* Instead of the `@InboundChannelAdapter`, a `SubscribableKafkaChanel`
  could be used.
* But in the given setup, this is kind of a dirty hack, because with this
  konfiguration the intentionally read-only input-channel becomes writable
  in Spring Integration.

src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index 246fffd..eeeeb39 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.wordcount.splitter;
 
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -8,15 +9,15 @@ import org.springframework.expression.Expression;
 import org.springframework.expression.ExpressionParser;
 import org.springframework.expression.common.LiteralExpression;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
-import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
+import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
-import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer;
 import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.SubscribableChannel;
 
 
 @SpringBootApplication
@@ -24,13 +25,17 @@ import org.springframework.messaging.MessageHandler;
 @EnableIntegration
 public class SplitterApplication
 {
-       @InboundChannelAdapter(channel = "recordings")
        @Bean
-       KafkaMessageSource<String, String> source(
-                       ConsumerFactory<String, String> cf,
-                       SplitterApplicationProperties properties)
+       SubscribableChannel recordings(
+                       KafkaTemplate<String, String> kafkaTemplate,
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+                       SplitterApplicationProperties properties,
+                       @Value("${spring.kafka.consumer.group-id}") String groupId)
        {
-               return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+               SubscribableKafkaChannel channel =
+                               new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic());
+               channel.setGroupId(groupId);
+               return channel;
        }
 
        @Bean