1 package de.juplo.kafka.wordcount.splitter;
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.annotation.Transformer;
13 import org.springframework.integration.channel.DirectChannel;
14 import org.springframework.integration.channel.interceptor.WireTap;
15 import org.springframework.integration.config.EnableIntegration;
16 import org.springframework.integration.config.GlobalChannelInterceptor;
17 import org.springframework.integration.core.MessageProducer;
18 import org.springframework.integration.handler.LoggingHandler;
19 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
20 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
21 import org.springframework.integration.transformer.HeaderEnricher;
22 import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
23 import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
24 import org.springframework.kafka.core.ConsumerFactory;
25 import org.springframework.kafka.core.KafkaTemplate;
26 import org.springframework.kafka.listener.ContainerProperties;
27 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
28 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
29 import org.springframework.kafka.config.KafkaListenerContainerFactory;
30 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
31 import org.springframework.messaging.MessageChannel;
32 import org.springframework.messaging.MessageHandler;
33 import org.springframework.messaging.support.ChannelInterceptor;
35 import java.util.HashMap;
39 @SpringBootApplication
40 @EnableConfigurationProperties(SplitterApplicationProperties.class)
42 public class SplitterApplication
45 MessageProducer messageProducer(
46 KafkaMessageListenerContainer<String, String> messageListenerContainer)
48 KafkaMessageDrivenChannelAdapter<String, String> adapter =
49 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
50 adapter.setOutputChannelName("kafka-in");
55 @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
56 public HeaderEnricher kafkaInHeaderEnricher()
58 Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
59 Expression expression =
60 new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
63 new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
64 HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
69 KafkaMessageListenerContainer<String, String> messageListenerContainer(
70 ConsumerFactory<String,String> consumerFactory,
71 SplitterApplicationProperties properties)
74 new KafkaMessageListenerContainer<>(
76 new ContainerProperties(properties.getInputTopic()));
80 SubscribableKafkaChannel recordings(
81 KafkaTemplate<String, String> kafkaTemplate,
82 KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
84 SubscribableKafkaChannel channel =
85 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
86 channel.setGroupId("recordings");
91 @ServiceActivator(inputChannel = "words")
92 MessageHandler handler(
93 KafkaTemplate<String, String> kafkaTemplate,
94 SplitterApplicationProperties properties)
96 KafkaProducerMessageHandler<String, String> handler =
97 new KafkaProducerMessageHandler<>(kafkaTemplate);
98 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
99 final ExpressionParser parser = new SpelExpressionParser();
100 Expression expression = parser.parseExpression("headers['key']");
101 handler.setMessageKeyExpression(expression);
106 SubscribableKafkaChannel words(
107 KafkaTemplate<String, String> kafkaTemplate,
108 KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
110 SubscribableKafkaChannel channel =
111 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
112 channel.setGroupId("words");
117 MessageChannel messageLog()
119 return new DirectChannel();
122 @GlobalChannelInterceptor
124 ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
126 return new WireTap(messageLog);
130 @ServiceActivator(inputChannel = "messageLog")
131 public LoggingHandler logging()
133 LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
134 adapter.setLoggerName("MESSAGE_LOG");
135 adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
140 public static void main(String[] args)
142 SpringApplication.run(SplitterApplication.class, args);