splitter: 1.0.0-spring-ingetration - fixed the ordering of the messages
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.annotation.Transformer;
13 import org.springframework.integration.channel.DirectChannel;
14 import org.springframework.integration.channel.interceptor.WireTap;
15 import org.springframework.integration.config.EnableIntegration;
16 import org.springframework.integration.config.GlobalChannelInterceptor;
17 import org.springframework.integration.core.MessageProducer;
18 import org.springframework.integration.handler.LoggingHandler;
19 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
20 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
21 import org.springframework.integration.transformer.HeaderEnricher;
22 import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
23 import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
24 import org.springframework.kafka.core.ConsumerFactory;
25 import org.springframework.kafka.core.KafkaTemplate;
26 import org.springframework.kafka.listener.ContainerProperties;
27 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
28 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
29 import org.springframework.kafka.config.KafkaListenerContainerFactory;
30 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
31 import org.springframework.messaging.Message;
32 import org.springframework.messaging.MessageChannel;
33 import org.springframework.messaging.MessageHandler;
34 import org.springframework.messaging.support.ChannelInterceptor;
35 import org.springframework.messaging.support.MessageBuilder;
36
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.Map;
41
42
43 @SpringBootApplication
44 @EnableConfigurationProperties(SplitterApplicationProperties.class)
45 @EnableIntegration
46 public class SplitterApplication
47 {
48         @Bean
49         MessageProducer messageProducer(
50                         KafkaMessageListenerContainer<String, String> messageListenerContainer)
51         {
52                 KafkaMessageDrivenChannelAdapter<String, String> adapter =
53                                 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
54                 adapter.setOutputChannelName("kafka-in");
55                 return adapter;
56         }
57
58         @Bean
59         @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
60         public HeaderEnricher kafkaInHeaderEnricher()
61         {
62                 Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
63                 Expression expression =
64                                 new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
65                 headersToAdd.put(
66                                 "key",
67                                 new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
68                 HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
69                 return enricher;
70         }
71
72         @Bean
73         KafkaMessageListenerContainer<String, String> messageListenerContainer(
74                         ConsumerFactory<String,String> consumerFactory,
75                         SplitterApplicationProperties properties)
76         {
77                 return
78                                 new KafkaMessageListenerContainer<>(
79                                                 consumerFactory,
80                                                 new ContainerProperties(properties.getInputTopic()));
81         }
82
83         @Bean
84         SubscribableKafkaChannel recordings(
85                         KafkaTemplate<String, String> kafkaTemplate,
86                         KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
87                         ChannelInterceptor messageKeyInterceptor)
88         {
89                 SubscribableKafkaChannel channel =
90                                 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
91                 channel.setGroupId("recordings");
92                 List<ChannelInterceptor> interceptors = new ArrayList<>(1);
93                 interceptors.add(messageKeyInterceptor);
94                 channel.setInterceptors(interceptors);
95                 return channel;
96         }
97
98         @Bean
99         @ServiceActivator(inputChannel = "words")
100         MessageHandler handler(
101                         KafkaTemplate<String, String> kafkaTemplate,
102                         SplitterApplicationProperties properties)
103         {
104                 KafkaProducerMessageHandler<String, String> handler =
105                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
106                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
107                 final ExpressionParser parser = new SpelExpressionParser();
108                 Expression expression = parser.parseExpression("headers['key']");
109                 handler.setMessageKeyExpression(expression);
110                 return handler;
111         }
112
113         @Bean
114         SubscribableKafkaChannel words(
115                         KafkaTemplate<String, String> kafkaTemplate,
116                         KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
117                         ChannelInterceptor messageKeyInterceptor)
118         {
119                 SubscribableKafkaChannel channel =
120                                 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
121                 channel.setGroupId("words");
122                 List<ChannelInterceptor> interceptors = new ArrayList<>(1);
123                 interceptors.add(messageKeyInterceptor);
124                 channel.setInterceptors(interceptors);
125                 return channel;
126         }
127
128         @Bean
129         MessageChannel messageLog()
130         {
131                 return new DirectChannel();
132         }
133
134         @Bean
135         ChannelInterceptor messageKeyInterceptor()
136         {
137                 return new ChannelInterceptor() {
138                         @Override
139                         public Message<?> preSend(Message<?> message, MessageChannel channel)
140                         {
141                                 String key = message.getHeaders().get("key", String.class);
142                                 return MessageBuilder
143                                                 .fromMessage(message)
144                                                 .setHeader("kafka_messageKey", key)
145                                                 .build();
146                         }
147                 };
148         }
149
150         @GlobalChannelInterceptor
151         @Bean
152         ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
153         {
154                 return new WireTap(messageLog);
155         }
156
157         @Bean
158         @ServiceActivator(inputChannel = "messageLog")
159         public LoggingHandler logging()
160         {
161                 LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
162                 adapter.setLoggerName("MESSAGE_LOG");
163                 adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
164                 return adapter;
165         }
166
167
168         public static void main(String[] args)
169         {
170                 SpringApplication.run(SplitterApplication.class, args);
171         }
172 }