splitter: 1.0.0-spring-ingetration - kafka-backed channels
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterApplication.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.expression.Expression;
8 import org.springframework.expression.ExpressionParser;
9 import org.springframework.expression.common.LiteralExpression;
10 import org.springframework.expression.spel.standard.SpelExpressionParser;
11 import org.springframework.integration.annotation.ServiceActivator;
12 import org.springframework.integration.annotation.Transformer;
13 import org.springframework.integration.config.EnableIntegration;
14 import org.springframework.integration.core.MessageProducer;
15 import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
16 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
17 import org.springframework.integration.transformer.HeaderEnricher;
18 import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
19 import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
20 import org.springframework.kafka.core.ConsumerFactory;
21 import org.springframework.kafka.core.KafkaTemplate;
22 import org.springframework.kafka.listener.ContainerProperties;
23 import org.springframework.kafka.listener.KafkaMessageListenerContainer;
24 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
25 import org.springframework.kafka.config.KafkaListenerContainerFactory;
26 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
27 import org.springframework.messaging.MessageHandler;
28
29 import java.util.HashMap;
30 import java.util.Map;
31
32
33 @SpringBootApplication
34 @EnableConfigurationProperties(SplitterApplicationProperties.class)
35 @EnableIntegration
36 public class SplitterApplication
37 {
38         @Bean
39         MessageProducer messageProducer(
40                         KafkaMessageListenerContainer<String, String> messageListenerContainer)
41         {
42                 KafkaMessageDrivenChannelAdapter<String, String> adapter =
43                                 new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
44                 adapter.setOutputChannelName("kafka-in");
45                 return adapter;
46         }
47
48         @Bean
49         @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
50         public HeaderEnricher kafkaInHeaderEnricher()
51         {
52                 Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
53                 Expression expression =
54                                 new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
55                 headersToAdd.put(
56                                 "key",
57                                 new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
58                 HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
59                 return enricher;
60         }
61
62         @Bean
63         KafkaMessageListenerContainer<String, String> messageListenerContainer(
64                         ConsumerFactory<String,String> consumerFactory,
65                         SplitterApplicationProperties properties)
66         {
67                 return
68                                 new KafkaMessageListenerContainer<>(
69                                                 consumerFactory,
70                                                 new ContainerProperties(properties.getInputTopic()));
71         }
72
73         @Bean
74         SubscribableKafkaChannel recordings(
75                         KafkaTemplate<String, String> kafkaTemplate,
76                         KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
77         {
78                 SubscribableKafkaChannel channel =
79                                 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
80                 channel.setGroupId("recordings");
81                 return channel;
82         }
83
84         @Bean
85         @ServiceActivator(inputChannel = "words")
86         MessageHandler handler(
87                         KafkaTemplate<String, String> kafkaTemplate,
88                         SplitterApplicationProperties properties)
89         {
90                 KafkaProducerMessageHandler<String, String> handler =
91                                 new KafkaProducerMessageHandler<>(kafkaTemplate);
92                 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
93                 final ExpressionParser parser = new SpelExpressionParser();
94                 Expression expression = parser.parseExpression("headers['key']");
95                 handler.setMessageKeyExpression(expression);
96                 return handler;
97         }
98
99         @Bean
100         SubscribableKafkaChannel words(
101                         KafkaTemplate<String, String> kafkaTemplate,
102                         KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
103         {
104                 SubscribableKafkaChannel channel =
105                                 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
106                 channel.setGroupId("words");
107                 return channel;
108         }
109
110         public static void main(String[] args)
111         {
112                 SpringApplication.run(SplitterApplication.class, args);
113         }
114 }