1 package de.juplo.kafka.wordcount.splitter;
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.context.annotation.ImportResource;
8 import org.springframework.expression.common.LiteralExpression;
9 import org.springframework.integration.annotation.InboundChannelAdapter;
10 import org.springframework.integration.annotation.ServiceActivator;
11 import org.springframework.integration.config.EnableIntegration;
12 import org.springframework.integration.kafka.inbound.KafkaMessageSource;
13 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
14 import org.springframework.integration.support.MessageBuilder;
15 import org.springframework.kafka.core.ConsumerFactory;
16 import org.springframework.kafka.core.KafkaTemplate;
17 import org.springframework.kafka.listener.ConsumerProperties;
18 import org.springframework.kafka.support.KafkaHeaders;
19 import org.springframework.messaging.Message;
20 import org.springframework.messaging.MessageChannel;
21 import org.springframework.messaging.MessageHandler;
22 import org.springframework.messaging.MessageHeaders;
23 import org.springframework.messaging.support.ChannelInterceptor;
26 @SpringBootApplication
27 @EnableConfigurationProperties(SplitterApplicationProperties.class)
29 @ImportResource("classpath:integration.xml")
30 public class SplitterApplication
33 ChannelInterceptor keyInterceptor ()
35 return new ChannelInterceptor()
38 public Message<?> preSend(Message<?> message, MessageChannel channel)
40 MessageHeaders headers = message.getHeaders();
41 Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
45 .setHeader(KafkaHeaders.MESSAGE_KEY, key)
51 @InboundChannelAdapter(channel = "recordings")
53 KafkaMessageSource<String, String> source(
54 ConsumerFactory<String, String> cf,
55 SplitterApplicationProperties properties)
57 return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
61 @ServiceActivator(inputChannel = "words")
62 MessageHandler handler(
63 KafkaTemplate<String, String> kafkaTemplate,
64 SplitterApplicationProperties properties)
66 KafkaProducerMessageHandler<String, String> handler =
67 new KafkaProducerMessageHandler<>(kafkaTemplate);
68 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
72 public static void main(String[] args)
74 SpringApplication.run(SplitterApplication.class, args);