import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ImportResource;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.ChannelInterceptor;
@SpringBootApplication
@EnableConfigurationProperties(SplitterApplicationProperties.class)
@EnableIntegration
+@ImportResource("classpath:integration.xml")
public class SplitterApplication
{
+ @Bean
+ ChannelInterceptor keyInterceptor ()
+ {
+ return new ChannelInterceptor()
+ {
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel)
+ {
+ MessageHeaders headers = message.getHeaders();
+ Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
+ return
+ MessageBuilder
+ .fromMessage(message)
+ .setHeader(KafkaHeaders.MESSAGE_KEY, key)
+ .build();
+ }
+ };
+ }
+
@InboundChannelAdapter(channel = "recordings")
@Bean
KafkaMessageSource<String, String> source(