splitter: 1.0.0-spring-integration - Inital implementation (incomplete)
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.common.LiteralExpression;
8 import org.springframework.integration.annotation.InboundChannelAdapter;
9 import org.springframework.integration.annotation.ServiceActivator;
10 import org.springframework.integration.config.EnableIntegration;
11 import org.springframework.integration.kafka.inbound.KafkaMessageSource;
12 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
13 import org.springframework.kafka.core.ConsumerFactory;
14 import org.springframework.kafka.core.KafkaTemplate;
15 import org.springframework.kafka.listener.ConsumerProperties;
16 import org.springframework.messaging.MessageHandler;
17
18
19 @SpringBootApplication
20 @EnableConfigurationProperties(SplitterApplicationProperties.class)
21 @EnableIntegration
22 public class SplitterApplication
23 {
24         @InboundChannelAdapter(channel = "recordings")
25         @Bean
26         KafkaMessageSource<String, String> source(
27                         ConsumerFactory<String, String> cf,
28                         SplitterApplicationProperties properties)
29         {
30                 return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
31         }
32
33         @Bean
34         @ServiceActivator(inputChannel = "words")
35         MessageHandler handler(
36                         KafkaTemplate<String, String> kafkaTemplate,
37                         SplitterApplicationProperties properties)
38         {
39                 KafkaProducerMessageHandler<String, String> handler =
40                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
41                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
42                 return handler;
43         }
44
45         public static void main(String[] args)
46         {
47                 SpringApplication.run(SplitterApplication.class, args);
48         }
49 }