1 package de.juplo.kafka.wordcount.splitter;
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.annotation.Transformer;
13 import org.springframework.integration.config.EnableIntegration;
14 import org.springframework.integration.core.MessageProducer;
15 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
16 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
17 import org.springframework.integration.transformer.HeaderEnricher;
18 import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
19 import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
20 import org.springframework.kafka.core.ConsumerFactory;
21 import org.springframework.kafka.core.KafkaTemplate;
22 import org.springframework.kafka.listener.ContainerProperties;
23 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
24 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
25 import org.springframework.kafka.config.KafkaListenerContainerFactory;
26 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
27 import org.springframework.messaging.MessageHandler;
29 import java.util.HashMap;
33 @SpringBootApplication
34 @EnableConfigurationProperties(SplitterApplicationProperties.class)
36 public class SplitterApplication
39 MessageProducer messageProducer(
40 KafkaMessageListenerContainer<String, String> messageListenerContainer)
42 KafkaMessageDrivenChannelAdapter<String, String> adapter =
43 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
44 adapter.setOutputChannelName("kafka-in");
49 @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
50 public HeaderEnricher kafkaInHeaderEnricher()
52 Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
53 Expression expression =
54 new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
57 new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
58 HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
63 KafkaMessageListenerContainer<String, String> messageListenerContainer(
64 ConsumerFactory<String,String> consumerFactory,
65 SplitterApplicationProperties properties)
68 new KafkaMessageListenerContainer<>(
70 new ContainerProperties(properties.getInputTopic()));
74 SubscribableKafkaChannel recordings(
75 KafkaTemplate<String, String> kafkaTemplate,
76 KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
78 SubscribableKafkaChannel channel =
79 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
80 channel.setGroupId("recordings");
85 @ServiceActivator(inputChannel = "words")
86 MessageHandler handler(
87 KafkaTemplate<String, String> kafkaTemplate,
88 SplitterApplicationProperties properties)
90 KafkaProducerMessageHandler<String, String> handler =
91 new KafkaProducerMessageHandler<>(kafkaTemplate);
92 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
93 final ExpressionParser parser = new SpelExpressionParser();
94 Expression expression = parser.parseExpression("headers['key']");
95 handler.setMessageKeyExpression(expression);
100 SubscribableKafkaChannel words(
101 KafkaTemplate<String, String> kafkaTemplate,
102 KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
104 SubscribableKafkaChannel channel =
105 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
106 channel.setGroupId("words");
110 public static void main(String[] args)
112 SpringApplication.run(SplitterApplication.class, args);