8bd928b555b743e4af8be9c20db42143fd746083
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.annotation.Transformer;
13 import org.springframework.integration.config.EnableIntegration;
14 import org.springframework.integration.core.MessageProducer;
15 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
16 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
17 import org.springframework.integration.transformer.HeaderEnricher;
18 import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
19 import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
20 import org.springframework.kafka.core.ConsumerFactory;
21 import org.springframework.kafka.core.KafkaTemplate;
22 import org.springframework.kafka.listener.ContainerProperties;
23 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
24 import org.springframework.messaging.MessageHandler;
25
26 import java.util.HashMap;
27 import java.util.Map;
28
29
30 @SpringBootApplication
31 @EnableConfigurationProperties(SplitterApplicationProperties.class)
32 @EnableIntegration
33 public class SplitterApplication
34 {
35         @Bean
36         MessageProducer messageProducer(
37                         KafkaMessageListenerContainer<String, String> messageListenerContainer)
38         {
39                 KafkaMessageDrivenChannelAdapter<String, String> adapter =
40                                 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
41                 adapter.setOutputChannelName("kafka-in");
42                 return adapter;
43         }
44
45         @Bean
46         @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
47         public HeaderEnricher kafkaInHeaderEnricher()
48         {
49                 Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
50                 Expression expression =
51                                 new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
52                 headersToAdd.put(
53                                 "key",
54                                 new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
55                 HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
56                 return enricher;
57         }
58
59         @Bean
60         KafkaMessageListenerContainer<String, String> messageListenerContainer(
61                         ConsumerFactory<String,String> consumerFactory,
62                         SplitterApplicationProperties properties)
63         {
64                 return
65                                 new KafkaMessageListenerContainer<>(
66                                                 consumerFactory,
67                                                 new ContainerProperties(properties.getInputTopic()));
68         }
69
70         @Bean
71         @ServiceActivator(inputChannel = "words")
72         MessageHandler handler(
73                         KafkaTemplate<String, String> kafkaTemplate,
74                         SplitterApplicationProperties properties)
75         {
76                 KafkaProducerMessageHandler<String, String> handler =
77                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
78                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
79                 final ExpressionParser parser = new SpelExpressionParser();
80                 Expression expression = parser.parseExpression("headers['key']");
81                 handler.setMessageKeyExpression(expression);
82                 return handler;
83         }
84
85         public static void main(String[] args)
86         {
87                 SpringApplication.run(SplitterApplication.class, args);
88         }
89 }