1 package de.juplo.kafka.wordcount.splitter;
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.annotation.Transformer;
13 import org.springframework.integration.config.EnableIntegration;
14 import org.springframework.integration.core.MessageProducer;
15 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
16 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
17 import org.springframework.integration.transformer.HeaderEnricher;
18 import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
19 import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
20 import org.springframework.kafka.core.ConsumerFactory;
21 import org.springframework.kafka.core.KafkaTemplate;
22 import org.springframework.kafka.listener.ContainerProperties;
23 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
24 import org.springframework.messaging.MessageHandler;
26 import java.util.HashMap;
30 @SpringBootApplication
31 @EnableConfigurationProperties(SplitterApplicationProperties.class)
33 public class SplitterApplication
36 MessageProducer messageProducer(
37 KafkaMessageListenerContainer<String, String> messageListenerContainer)
39 KafkaMessageDrivenChannelAdapter<String, String> adapter =
40 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
41 adapter.setOutputChannelName("kafka-in");
46 @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
47 public HeaderEnricher kafkaInHeaderEnricher()
49 Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
50 Expression expression =
51 new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
54 new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
55 HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
60 KafkaMessageListenerContainer<String, String> messageListenerContainer(
61 ConsumerFactory<String,String> consumerFactory,
62 SplitterApplicationProperties properties)
65 new KafkaMessageListenerContainer<>(
67 new ContainerProperties(properties.getInputTopic()));
71 @ServiceActivator(inputChannel = "words")
72 MessageHandler handler(
73 KafkaTemplate<String, String> kafkaTemplate,
74 SplitterApplicationProperties properties)
76 KafkaProducerMessageHandler<String, String> handler =
77 new KafkaProducerMessageHandler<>(kafkaTemplate);
78 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
79 final ExpressionParser parser = new SpelExpressionParser();
80 Expression expression = parser.parseExpression("headers['key']");
81 handler.setMessageKeyExpression(expression);
85 public static void main(String[] args)
87 SpringApplication.run(SplitterApplication.class, args);