splitter: 1.0.0-spring-ingetration - using a channel backed by Kafka
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.beans.factory.annotation.Value;
4 import org.springframework.boot.SpringApplication;
5 import org.springframework.boot.autoconfigure.SpringBootApplication;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.expression.Expression;
9 import org.springframework.expression.ExpressionParser;
10 import org.springframework.expression.common.LiteralExpression;
11 import org.springframework.expression.spel.standard.SpelExpressionParser;
12 import org.springframework.integration.annotation.ServiceActivator;
13 import org.springframework.integration.config.EnableIntegration;
14 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
15 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
16 import org.springframework.kafka.config.KafkaListenerContainerFactory;
17 import org.springframework.kafka.core.KafkaTemplate;
18 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
19 import org.springframework.messaging.MessageHandler;
20 import org.springframework.messaging.SubscribableChannel;
21
22
23 @SpringBootApplication
24 @EnableConfigurationProperties(SplitterApplicationProperties.class)
25 @EnableIntegration
26 public class SplitterApplication
27 {
28         @Bean
29         SubscribableChannel recordings(
30                         KafkaTemplate<String, String> kafkaTemplate,
31                         KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
32                         SplitterApplicationProperties properties,
33                         @Value("${spring.kafka.consumer.group-id}") String groupId)
34         {
35                 SubscribableKafkaChannel channel =
36                                 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic());
37                 channel.setGroupId(groupId);
38                 return channel;
39         }
40
41         @Bean
42         @ServiceActivator(inputChannel = "words")
43         MessageHandler handler(
44                         KafkaTemplate<String, String> kafkaTemplate,
45                         SplitterApplicationProperties properties)
46         {
47                 KafkaProducerMessageHandler<String, String> handler =
48                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
49                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
50                 final ExpressionParser parser = new SpelExpressionParser();
51                 Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
52                 handler.setMessageKeyExpression(expression);
53                 return handler;
54         }
55
56         public static void main(String[] args)
57         {
58                 SpringApplication.run(SplitterApplication.class, args);
59         }
60 }