3dcb443b46370c03c2d9d81927e70c3f9f04b027
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.annotation.Transformer;
13 import org.springframework.integration.channel.DirectChannel;
14 import org.springframework.integration.channel.interceptor.WireTap;
15 import org.springframework.integration.config.EnableIntegration;
16 import org.springframework.integration.config.GlobalChannelInterceptor;
17 import org.springframework.integration.core.MessageProducer;
18 import org.springframework.integration.handler.LoggingHandler;
19 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
20 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
21 import org.springframework.integration.transformer.HeaderEnricher;
22 import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
23 import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
24 import org.springframework.kafka.core.ConsumerFactory;
25 import org.springframework.kafka.core.KafkaTemplate;
26 import org.springframework.kafka.listener.ContainerProperties;
27 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
28 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
29 import org.springframework.kafka.config.KafkaListenerContainerFactory;
30 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
31 import org.springframework.messaging.MessageChannel;
32 import org.springframework.messaging.MessageHandler;
33 import org.springframework.messaging.support.ChannelInterceptor;
34
35 import java.util.HashMap;
36 import java.util.Map;
37
38
39 @SpringBootApplication
40 @EnableConfigurationProperties(SplitterApplicationProperties.class)
41 @EnableIntegration
42 public class SplitterApplication
43 {
44         @Bean
45         MessageProducer messageProducer(
46                         KafkaMessageListenerContainer<String, String> messageListenerContainer)
47         {
48                 KafkaMessageDrivenChannelAdapter<String, String> adapter =
49                                 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
50                 adapter.setOutputChannelName("kafka-in");
51                 return adapter;
52         }
53
54         @Bean
55         @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
56         public HeaderEnricher kafkaInHeaderEnricher()
57         {
58                 Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
59                 Expression expression =
60                                 new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
61                 headersToAdd.put(
62                                 "key",
63                                 new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
64                 HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
65                 return enricher;
66         }
67
68         @Bean
69         KafkaMessageListenerContainer<String, String> messageListenerContainer(
70                         ConsumerFactory<String,String> consumerFactory,
71                         SplitterApplicationProperties properties)
72         {
73                 return
74                                 new KafkaMessageListenerContainer<>(
75                                                 consumerFactory,
76                                                 new ContainerProperties(properties.getInputTopic()));
77         }
78
79         @Bean
80         SubscribableKafkaChannel recordings(
81                         KafkaTemplate<String, String> kafkaTemplate,
82                         KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
83         {
84                 SubscribableKafkaChannel channel =
85                                 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
86                 channel.setGroupId("recordings");
87                 return channel;
88         }
89
90         @Bean
91         @ServiceActivator(inputChannel = "words")
92         MessageHandler handler(
93                         KafkaTemplate<String, String> kafkaTemplate,
94                         SplitterApplicationProperties properties)
95         {
96                 KafkaProducerMessageHandler<String, String> handler =
97                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
98                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
99                 final ExpressionParser parser = new SpelExpressionParser();
100                 Expression expression = parser.parseExpression("headers['key']");
101                 handler.setMessageKeyExpression(expression);
102                 return handler;
103         }
104
105         @Bean
106         SubscribableKafkaChannel words(
107                         KafkaTemplate<String, String> kafkaTemplate,
108                         KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
109         {
110                 SubscribableKafkaChannel channel =
111                                 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
112                 channel.setGroupId("words");
113                 return channel;
114         }
115
116         @Bean
117         MessageChannel messageLog()
118         {
119                 return new DirectChannel();
120         }
121
122         @GlobalChannelInterceptor
123         @Bean
124         ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
125         {
126                 return new WireTap(messageLog);
127         }
128
129         @Bean
130         @ServiceActivator(inputChannel = "messageLog")
131         public LoggingHandler logging()
132         {
133                 LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
134                 adapter.setLoggerName("MESSAGE_LOG");
135                 adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
136                 return adapter;
137         }
138
139
140         public static void main(String[] args)
141         {
142                 SpringApplication.run(SplitterApplication.class, args);
143         }
144 }