3828050e00aa90529924cfc0f65d49568e2da254
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.config.EnableIntegration;
13 import org.springframework.integration.core.MessageProducer;
14 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
15 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
16 import org.springframework.kafka.core.ConsumerFactory;
17 import org.springframework.kafka.core.KafkaTemplate;
18 import org.springframework.kafka.listener.ContainerProperties;
19 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
20 import org.springframework.messaging.MessageHandler;
21
22
23 @SpringBootApplication
24 @EnableConfigurationProperties(SplitterApplicationProperties.class)
25 @EnableIntegration
26 public class SplitterApplication
27 {
28         @Bean
29         MessageProducer messageProducer(
30                         KafkaMessageListenerContainer<String, String> messageListenerContainer)
31         {
32                 KafkaMessageDrivenChannelAdapter<String, String> adapter =
33                                 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
34                 adapter.setOutputChannelName("recordings");
35                 return adapter;
36         }
37
38         @Bean
39         KafkaMessageListenerContainer<String, String> messageListenerContainer(
40                         ConsumerFactory<String,String> consumerFactory,
41                         SplitterApplicationProperties properties)
42         {
43                 return
44                                 new KafkaMessageListenerContainer<>(
45                                                 consumerFactory,
46                                                 new ContainerProperties(properties.getInputTopic()));
47         }
48
49         @Bean
50         @ServiceActivator(inputChannel = "words")
51         MessageHandler handler(
52                         KafkaTemplate<String, String> kafkaTemplate,
53                         SplitterApplicationProperties properties)
54         {
55                 KafkaProducerMessageHandler<String, String> handler =
56                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
57                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
58                 final ExpressionParser parser = new SpelExpressionParser();
59                 Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
60                 handler.setMessageKeyExpression(expression);
61                 return handler;
62         }
63
64         public static void main(String[] args)
65         {
66                 SpringApplication.run(SplitterApplication.class, args);
67         }
68 }