246fffd66a37442954eb689ace8c86cd1dd34736
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.InboundChannelAdapter;
12 import org.springframework.integration.annotation.ServiceActivator;
13 import org.springframework.integration.config.EnableIntegration;
14 import org.springframework.integration.kafka.inbound.KafkaMessageSource;
15 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
16 import org.springframework.kafka.core.ConsumerFactory;
17 import org.springframework.kafka.core.KafkaTemplate;
18 import org.springframework.kafka.listener.ConsumerProperties;
19 import org.springframework.messaging.MessageHandler;
20
21
22 @SpringBootApplication
23 @EnableConfigurationProperties(SplitterApplicationProperties.class)
24 @EnableIntegration
25 public class SplitterApplication
26 {
27         @InboundChannelAdapter(channel = "recordings")
28         @Bean
29         KafkaMessageSource<String, String> source(
30                         ConsumerFactory<String, String> cf,
31                         SplitterApplicationProperties properties)
32         {
33                 return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
34         }
35
36         @Bean
37         @ServiceActivator(inputChannel = "words")
38         MessageHandler handler(
39                         KafkaTemplate<String, String> kafkaTemplate,
40                         SplitterApplicationProperties properties)
41         {
42                 KafkaProducerMessageHandler<String, String> handler =
43                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
44                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
45                 final ExpressionParser parser = new SpelExpressionParser();
46                 Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
47                 handler.setMessageKeyExpression(expression);
48                 return handler;
49         }
50
51         public static void main(String[] args)
52         {
53                 SpringApplication.run(SplitterApplication.class, args);
54         }
55 }