WIP wip-kafka-backed-channels
authorKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 16:09:41 +0000 (18:09 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 16:09:41 +0000 (18:09 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index 3fec844..555c216 100644 (file)
@@ -9,13 +9,17 @@ import org.springframework.expression.Expression;
 import org.springframework.expression.ExpressionParser;
 import org.springframework.expression.common.LiteralExpression;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.integration.annotation.InboundChannelAdapter;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
+import org.springframework.integration.kafka.inbound.KafkaMessageSource;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.kafka.listener.ConsumerProperties;
 import org.springframework.messaging.MessageHandler;
 
 
@@ -27,16 +31,23 @@ public class SplitterApplication
        @Bean
        SubscribableKafkaChannel recordings(
                        KafkaTemplate<String, String> kafkaTemplate,
-                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
-                       SplitterApplicationProperties properties,
-                       @Value("${spring.kafka.consumer.group-id}") String groupId)
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
        {
                SubscribableKafkaChannel channel =
-                               new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic());
-               channel.setGroupId(groupId);
+                               new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
+               channel.setGroupId("recordings");
                return channel;
        }
 
+       @InboundChannelAdapter(channel = "recordings")
+       @Bean
+       KafkaMessageSource<String, String> source(
+                       ConsumerFactory<String, String> cf,
+                       SplitterApplicationProperties properties)
+       {
+               return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+       }
+
        @Bean
        @ServiceActivator(inputChannel = "words")
        MessageHandler handler(
@@ -52,6 +63,17 @@ public class SplitterApplication
                return handler;
        }
 
+       @Bean
+       SubscribableKafkaChannel words(
+                       KafkaTemplate<String, String> kafkaTemplate,
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+       {
+               SubscribableKafkaChannel channel =
+                               new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+               channel.setGroupId("words");
+               return channel;
+       }
+
        public static void main(String[] args)
        {
                SpringApplication.run(SplitterApplication.class, args);