splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
index d9ba135..cfe7a4a 100644 (file)
@@ -4,23 +4,40 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
 import org.springframework.expression.common.LiteralExpression;
-import org.springframework.integration.annotation.InboundChannelAdapter;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
 import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.annotation.Transformer;
 import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.interceptor.WireTap;
 import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
+import org.springframework.integration.config.GlobalChannelInterceptor;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.handler.LoggingHandler;
+import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
-import org.springframework.integration.support.MessageBuilder;
+import org.springframework.integration.transformer.HeaderEnricher;
+import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
+import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.listener.ConsumerProperties;
-import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
-import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.MessageBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 
 @SpringBootApplication
@@ -29,39 +46,53 @@ import org.springframework.messaging.support.ChannelInterceptor;
 public class SplitterApplication
 {
        @Bean
-       MessageChannel words(ChannelInterceptor keyInterceptor)
+       MessageProducer messageProducer(
+                       KafkaMessageListenerContainer<String, String> messageListenerContainer)
        {
-               DirectChannel words = new DirectChannel();
-               words.addInterceptor(keyInterceptor);
-               return words;
+               KafkaMessageDrivenChannelAdapter<String, String> adapter =
+                               new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
+               adapter.setOutputChannelName("kafka-in");
+               return adapter;
        }
 
        @Bean
-       ChannelInterceptor keyInterceptor ()
+       @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
+       public HeaderEnricher kafkaInHeaderEnricher()
        {
-               return new ChannelInterceptor()
-               {
-                       @Override
-                       public Message<?> preSend(Message<?> message, MessageChannel channel)
-                       {
-                               MessageHeaders headers = message.getHeaders();
-                               Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
-                               return
-                                               MessageBuilder
-                                                               .fromMessage(message)
-                                                               .setHeader(KafkaHeaders.MESSAGE_KEY, key)
-                                                               .build();
-                       }
-               };
+               Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+               Expression expression =
+                               new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+               headersToAdd.put(
+                               "key",
+                               new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+               HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+               return enricher;
        }
 
-       @InboundChannelAdapter(channel = "recordings")
        @Bean
-       KafkaMessageSource<String, String> source(
-                       ConsumerFactory<String, String> cf,
+       KafkaMessageListenerContainer<String, String> messageListenerContainer(
+                       ConsumerFactory<String,String> consumerFactory,
                        SplitterApplicationProperties properties)
        {
-               return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+               return
+                               new KafkaMessageListenerContainer<>(
+                                               consumerFactory,
+                                               new ContainerProperties(properties.getInputTopic()));
+       }
+
+       @Bean
+       SubscribableKafkaChannel recordings(
+                       KafkaTemplate<String, String> kafkaTemplate,
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+                       ChannelInterceptor messageKeyInterceptor)
+       {
+               SubscribableKafkaChannel channel =
+                               new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
+               channel.setGroupId("recordings");
+               List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+               interceptors.add(messageKeyInterceptor);
+               channel.setInterceptors(interceptors);
+               return channel;
        }
 
        @Bean
@@ -73,9 +104,67 @@ public class SplitterApplication
                KafkaProducerMessageHandler<String, String> handler =
                                new KafkaProducerMessageHandler<>(kafkaTemplate);
                handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+               final ExpressionParser parser = new SpelExpressionParser();
+               Expression expression = parser.parseExpression("headers['key']");
+               handler.setMessageKeyExpression(expression);
                return handler;
        }
 
+       @Bean
+       SubscribableKafkaChannel words(
+                       KafkaTemplate<String, String> kafkaTemplate,
+                       KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+                       ChannelInterceptor messageKeyInterceptor)
+       {
+               SubscribableKafkaChannel channel =
+                               new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+               channel.setGroupId("words");
+               List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+               interceptors.add(messageKeyInterceptor);
+               channel.setInterceptors(interceptors);
+               return channel;
+       }
+
+       @Bean
+       MessageChannel messageLog()
+       {
+               return new DirectChannel();
+       }
+
+       @Bean
+       ChannelInterceptor messageKeyInterceptor()
+       {
+               return new ChannelInterceptor() {
+                       @Override
+                       public Message<?> preSend(Message<?> message, MessageChannel channel)
+                       {
+                               String key = message.getHeaders().get("key", String.class);
+                               return MessageBuilder
+                                               .fromMessage(message)
+                                               .setHeader("kafka_messageKey", key)
+                                               .build();
+                       }
+               };
+       }
+
+       @GlobalChannelInterceptor
+       @Bean
+       ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
+       {
+               return new WireTap(messageLog);
+       }
+
+       @Bean
+       @ServiceActivator(inputChannel = "messageLog")
+       public LoggingHandler logging()
+       {
+               LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
+               adapter.setLoggerName("MESSAGE_LOG");
+               adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
+               return adapter;
+       }
+
+
        public static void main(String[] args)
        {
                SpringApplication.run(SplitterApplication.class, args);