From 8d0ef2423c0f6ef6a439b98a1d9b1b1a64d89d45 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 18 Jul 2022 18:09:41 +0200 Subject: [PATCH] splitter: 1.0.0-spring-ingetration - kafka-backed channels * This configuration uses `SubscribableKafkaChannel` to configure kafka-backed channels for the message-channels `recordings` and `words`. * Since the header `kafka_messageKey` is not set, the messages are send with `null`-keys. Hence, the order of the messages will be lost for topics with more than one partition. --- .../splitter/SplitterApplication.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 8bd928b..7087ed1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -21,6 +21,9 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.messaging.MessageHandler; import java.util.HashMap; @@ -67,6 +70,17 @@ public class SplitterApplication new ContainerProperties(properties.getInputTopic())); } + @Bean + SubscribableKafkaChannel recordings( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory) + { + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings"); + channel.setGroupId("recordings"); + return channel; + } + @Bean @ServiceActivator(inputChannel = "words") MessageHandler handler( @@ -82,6 +96,17 @@ public class SplitterApplication return handler; } + @Bean + SubscribableKafkaChannel words( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory) + { + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words"); + channel.setGroupId("words"); + return channel; + } + public static void main(String[] args) { SpringApplication.run(SplitterApplication.class, args); -- 2.20.1