splitter: 1.0.0-spring-ingetration - kafka-backed channels
authorKai Moritz <kai@juplo.de>
Mon, 18 Jul 2022 16:09:41 +0000 (18:09 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 20 Jul 2022 18:36:53 +0000 (20:36 +0200)
* This configuration uses `SubscribableKafkaChannel` to configure
  kafka-backed channels for the message-channels `recordings` and `words`.
* Since the header `kafka_messageKey` is not set, the messages are send
  with `null`-keys. Hence, the order of the messages will be lost for
  topics with more than one partition.

src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index 8bd928b..7087ed1 100644 (file)
@@ -21,6 +21,9 @@ import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.listener.ContainerProperties;
 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer;
 import org.springframework.messaging.MessageHandler;
 
 import java.util.HashMap;
@@ -67,6 +70,17 @@ public class SplitterApplication
                                                new ContainerProperties(properties.getInputTopic()));
        }
 
+       @Bean
+       SubscribableKafkaChannel recordings(
+                       KafkaTemplate<String, String> kafkaTemplate,
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+       {
+               SubscribableKafkaChannel channel =
+                               new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
+               channel.setGroupId("recordings");
+               return channel;
+       }
+
        @Bean
        @ServiceActivator(inputChannel = "words")
        MessageHandler handler(
@@ -82,6 +96,17 @@ public class SplitterApplication
                return handler;
        }
 
+       @Bean
+       SubscribableKafkaChannel words(
+                       KafkaTemplate<String, String> kafkaTemplate,
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+       {
+               SubscribableKafkaChannel channel =
+                               new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+               channel.setGroupId("words");
+               return channel;
+       }
+
        public static void main(String[] args)
        {
                SpringApplication.run(SplitterApplication.class, args);