splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages splitter-spring-integration
authorKai Moritz <kai@juplo.de>
Wed, 20 Jul 2022 19:02:42 +0000 (21:02 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 20 Jul 2022 19:02:42 +0000 (21:02 +0200)
* Added a `ChannelInterceptor` to the `KafkaSubscribableChannel`, that
  adds the value of the `key`-Header as Header `kafka_messageKey`).
* This fixes the ordering, because now, all words, that are split of from
  a recording of a user are send with the same key.

src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java

index 3dcb443..cfe7a4a 100644 (file)
@@ -28,11 +28,15 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer;
 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.MessageBuilder;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -79,11 +83,15 @@ public class SplitterApplication
        @Bean
        SubscribableKafkaChannel recordings(
                        KafkaTemplate<String, String> kafkaTemplate,
-                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+                       ChannelInterceptor messageKeyInterceptor)
        {
                SubscribableKafkaChannel channel =
                                new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
                channel.setGroupId("recordings");
+               List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+               interceptors.add(messageKeyInterceptor);
+               channel.setInterceptors(interceptors);
                return channel;
        }
 
@@ -105,11 +113,15 @@ public class SplitterApplication
        @Bean
        SubscribableKafkaChannel words(
                        KafkaTemplate<String, String> kafkaTemplate,
-                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+                       ChannelInterceptor messageKeyInterceptor)
        {
                SubscribableKafkaChannel channel =
                                new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
                channel.setGroupId("words");
+               List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+               interceptors.add(messageKeyInterceptor);
+               channel.setInterceptors(interceptors);
                return channel;
        }
 
@@ -119,6 +131,22 @@ public class SplitterApplication
                return new DirectChannel();
        }
 
+       @Bean
+       ChannelInterceptor messageKeyInterceptor()
+       {
+               return new ChannelInterceptor() {
+                       @Override
+                       public Message<?> preSend(Message<?> message, MessageChannel channel)
+                       {
+                               String key = message.getHeaders().get("key", String.class);
+                               return MessageBuilder
+                                               .fromMessage(message)
+                                               .setHeader("kafka_messageKey", key)
+                                               .build();
+                       }
+               };
+       }
+
        @GlobalChannelInterceptor
        @Bean
        ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)