splitter: 1.0.0-spring-ingetration - extract the key on the inbound channel
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
index d46a7cd..8bd928b 100644 (file)
@@ -4,20 +4,83 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
+import org.springframework.expression.common.LiteralExpression;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.annotation.Transformer;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
+import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.integration.transformer.HeaderEnricher;
+import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
+import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.messaging.MessageHandler;
 
-import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
 
 
 @SpringBootApplication
 @EnableConfigurationProperties(SplitterApplicationProperties.class)
+@EnableIntegration
 public class SplitterApplication
 {
        @Bean
-       Clock clock()
+       MessageProducer messageProducer(
+                       KafkaMessageListenerContainer<String, String> messageListenerContainer)
        {
-               return Clock.systemDefaultZone();
+               KafkaMessageDrivenChannelAdapter<String, String> adapter =
+                               new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
+               adapter.setOutputChannelName("kafka-in");
+               return adapter;
        }
 
+       @Bean
+       @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
+       public HeaderEnricher kafkaInHeaderEnricher()
+       {
+               Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+               Expression expression =
+                               new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+               headersToAdd.put(
+                               "key",
+                               new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+               HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+               return enricher;
+       }
+
+       @Bean
+       KafkaMessageListenerContainer<String, String> messageListenerContainer(
+                       ConsumerFactory<String,String> consumerFactory,
+                       SplitterApplicationProperties properties)
+       {
+               return
+                               new KafkaMessageListenerContainer<>(
+                                               consumerFactory,
+                                               new ContainerProperties(properties.getInputTopic()));
+       }
+
+       @Bean
+       @ServiceActivator(inputChannel = "words")
+       MessageHandler handler(
+                       KafkaTemplate<String, String> kafkaTemplate,
+                       SplitterApplicationProperties properties)
+       {
+               KafkaProducerMessageHandler<String, String> handler =
+                               new KafkaProducerMessageHandler<>(kafkaTemplate);
+               handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+               final ExpressionParser parser = new SpelExpressionParser();
+               Expression expression = parser.parseExpression("headers['key']");
+               handler.setMessageKeyExpression(expression);
+               return handler;
+       }
 
        public static void main(String[] args)
        {