import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.LiteralExpression;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
-import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
-import org.springframework.messaging.MessageHeaders;
-import org.springframework.messaging.support.ChannelInterceptor;
@SpringBootApplication
@EnableIntegration
public class SplitterApplication
{
- @Bean
- MessageChannel words(ChannelInterceptor keyInterceptor)
- {
- DirectChannel words = new DirectChannel();
- words.addInterceptor(keyInterceptor);
- return words;
- }
-
- @Bean
- ChannelInterceptor keyInterceptor ()
- {
- return new ChannelInterceptor()
- {
- @Override
- public Message<?> preSend(Message<?> message, MessageChannel channel)
- {
- MessageHeaders headers = message.getHeaders();
- Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
- return
- MessageBuilder
- .fromMessage(message)
- .setHeader(KafkaHeaders.MESSAGE_KEY, key)
- .build();
- }
- };
- }
-
@InboundChannelAdapter(channel = "recordings")
@Bean
KafkaMessageSource<String, String> source(
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+ final ExpressionParser parser = new SpelExpressionParser();
+ Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
+ handler.setMessageKeyExpression(expression);
return handler;
}