d9ba135f48f949dc9c63d586d949609895c6e739
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.common.LiteralExpression;
8 import org.springframework.integration.annotation.InboundChannelAdapter;
9 import org.springframework.integration.annotation.ServiceActivator;
10 import org.springframework.integration.channel.DirectChannel;
11 import org.springframework.integration.config.EnableIntegration;
12 import org.springframework.integration.kafka.inbound.KafkaMessageSource;
13 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
14 import org.springframework.integration.support.MessageBuilder;
15 import org.springframework.kafka.core.ConsumerFactory;
16 import org.springframework.kafka.core.KafkaTemplate;
17 import org.springframework.kafka.listener.ConsumerProperties;
18 import org.springframework.kafka.support.KafkaHeaders;
19 import org.springframework.messaging.Message;
20 import org.springframework.messaging.MessageChannel;
21 import org.springframework.messaging.MessageHandler;
22 import org.springframework.messaging.MessageHeaders;
23 import org.springframework.messaging.support.ChannelInterceptor;
24
25
26 @SpringBootApplication
27 @EnableConfigurationProperties(SplitterApplicationProperties.class)
28 @EnableIntegration
29 public class SplitterApplication
30 {
31         @Bean
32         MessageChannel words(ChannelInterceptor keyInterceptor)
33         {
34                 DirectChannel words = new DirectChannel();
35                 words.addInterceptor(keyInterceptor);
36                 return words;
37         }
38
39         @Bean
40         ChannelInterceptor keyInterceptor ()
41         {
42                 return new ChannelInterceptor()
43                 {
44                         @Override
45                         public Message<?> preSend(Message<?> message, MessageChannel channel)
46                         {
47                                 MessageHeaders headers = message.getHeaders();
48                                 Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
49                                 return
50                                                 MessageBuilder
51                                                                 .fromMessage(message)
52                                                                 .setHeader(KafkaHeaders.MESSAGE_KEY, key)
53                                                                 .build();
54                         }
55                 };
56         }
57
58         @InboundChannelAdapter(channel = "recordings")
59         @Bean
60         KafkaMessageSource<String, String> source(
61                         ConsumerFactory<String, String> cf,
62                         SplitterApplicationProperties properties)
63         {
64                 return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
65         }
66
67         @Bean
68         @ServiceActivator(inputChannel = "words")
69         MessageHandler handler(
70                         KafkaTemplate<String, String> kafkaTemplate,
71                         SplitterApplicationProperties properties)
72         {
73                 KafkaProducerMessageHandler<String, String> handler =
74                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
75                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
76                 return handler;
77         }
78
79         public static void main(String[] args)
80         {
81                 SpringApplication.run(SplitterApplication.class, args);
82         }
83 }