1 package de.juplo.kafka.wordcount.splitter;
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.annotation.Transformer;
13 import org.springframework.integration.channel.DirectChannel;
14 import org.springframework.integration.channel.interceptor.WireTap;
15 import org.springframework.integration.config.EnableIntegration;
16 import org.springframework.integration.config.GlobalChannelInterceptor;
17 import org.springframework.integration.core.MessageProducer;
18 import org.springframework.integration.handler.LoggingHandler;
19 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
20 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
21 import org.springframework.integration.transformer.HeaderEnricher;
22 import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
23 import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
24 import org.springframework.kafka.core.ConsumerFactory;
25 import org.springframework.kafka.core.KafkaTemplate;
26 import org.springframework.kafka.listener.ContainerProperties;
27 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
28 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
29 import org.springframework.kafka.config.KafkaListenerContainerFactory;
30 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
31 import org.springframework.messaging.Message;
32 import org.springframework.messaging.MessageChannel;
33 import org.springframework.messaging.MessageHandler;
34 import org.springframework.messaging.support.ChannelInterceptor;
35 import org.springframework.messaging.support.MessageBuilder;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.List;
43 @SpringBootApplication
44 @EnableConfigurationProperties(SplitterApplicationProperties.class)
46 public class SplitterApplication
49 MessageProducer messageProducer(
50 KafkaMessageListenerContainer<String, String> messageListenerContainer)
52 KafkaMessageDrivenChannelAdapter<String, String> adapter =
53 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
54 adapter.setOutputChannelName("kafka-in");
59 @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
60 public HeaderEnricher kafkaInHeaderEnricher()
62 Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
63 Expression expression =
64 new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
67 new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
68 HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
73 KafkaMessageListenerContainer<String, String> messageListenerContainer(
74 ConsumerFactory<String,String> consumerFactory,
75 SplitterApplicationProperties properties)
78 new KafkaMessageListenerContainer<>(
80 new ContainerProperties(properties.getInputTopic()));
84 SubscribableKafkaChannel recordings(
85 KafkaTemplate<String, String> kafkaTemplate,
86 KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
87 ChannelInterceptor messageKeyInterceptor)
89 SubscribableKafkaChannel channel =
90 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
91 channel.setGroupId("recordings");
92 List<ChannelInterceptor> interceptors = new ArrayList<>(1);
93 interceptors.add(messageKeyInterceptor);
94 channel.setInterceptors(interceptors);
99 @ServiceActivator(inputChannel = "words")
100 MessageHandler handler(
101 KafkaTemplate<String, String> kafkaTemplate,
102 SplitterApplicationProperties properties)
104 KafkaProducerMessageHandler<String, String> handler =
105 new KafkaProducerMessageHandler<>(kafkaTemplate);
106 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
107 final ExpressionParser parser = new SpelExpressionParser();
108 Expression expression = parser.parseExpression("headers['key']");
109 handler.setMessageKeyExpression(expression);
114 SubscribableKafkaChannel words(
115 KafkaTemplate<String, String> kafkaTemplate,
116 KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
117 ChannelInterceptor messageKeyInterceptor)
119 SubscribableKafkaChannel channel =
120 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
121 channel.setGroupId("words");
122 List<ChannelInterceptor> interceptors = new ArrayList<>(1);
123 interceptors.add(messageKeyInterceptor);
124 channel.setInterceptors(interceptors);
129 MessageChannel messageLog()
131 return new DirectChannel();
135 ChannelInterceptor messageKeyInterceptor()
137 return new ChannelInterceptor() {
139 public Message<?> preSend(Message<?> message, MessageChannel channel)
141 String key = message.getHeaders().get("key", String.class);
142 return MessageBuilder
143 .fromMessage(message)
144 .setHeader("kafka_messageKey", key)
150 @GlobalChannelInterceptor
152 ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
154 return new WireTap(messageLog);
158 @ServiceActivator(inputChannel = "messageLog")
159 public LoggingHandler logging()
161 LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
162 adapter.setLoggerName("MESSAGE_LOG");
163 adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
168 public static void main(String[] args)
170 SpringApplication.run(SplitterApplication.class, args);