1 package de.juplo.kafka.wordcount.splitter;
3 import org.springframework.beans.factory.annotation.Value;
4 import org.springframework.boot.SpringApplication;
5 import org.springframework.boot.autoconfigure.SpringBootApplication;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.expression.Expression;
9 import org.springframework.expression.ExpressionParser;
10 import org.springframework.expression.common.LiteralExpression;
11 import org.springframework.expression.spel.standard.SpelExpressionParser;
12 import org.springframework.integration.annotation.ServiceActivator;
13 import org.springframework.integration.config.EnableIntegration;
14 import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
15 import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
16 import org.springframework.kafka.config.KafkaListenerContainerFactory;
17 import org.springframework.kafka.core.KafkaTemplate;
18 import org.springframework.kafka.listener.AbstractMessageListenerContainer;
19 import org.springframework.messaging.MessageHandler;
20 import org.springframework.messaging.SubscribableChannel;
23 @SpringBootApplication
24 @EnableConfigurationProperties(SplitterApplicationProperties.class)
26 public class SplitterApplication
29 SubscribableChannel recordings(
30 KafkaTemplate<String, String> kafkaTemplate,
31 KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
32 SplitterApplicationProperties properties,
33 @Value("${spring.kafka.consumer.group-id}") String groupId)
35 SubscribableKafkaChannel channel =
36 new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic());
37 channel.setGroupId(groupId);
42 @ServiceActivator(inputChannel = "words")
43 MessageHandler handler(
44 KafkaTemplate<String, String> kafkaTemplate,
45 SplitterApplicationProperties properties)
47 KafkaProducerMessageHandler<String, String> handler =
48 new KafkaProducerMessageHandler<>(kafkaTemplate);
49 handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
50 final ExpressionParser parser = new SpelExpressionParser();
51 Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
52 handler.setMessageKeyExpression(expression);
56 public static void main(String[] args)
58 SpringApplication.run(SplitterApplication.class, args);