import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.LiteralExpression;
-import org.springframework.integration.annotation.InboundChannelAdapter;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
+import org.springframework.integration.config.GlobalChannelInterceptor;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.handler.LoggingHandler;
+import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
-import org.springframework.integration.support.MessageBuilder;
+import org.springframework.integration.transformer.HeaderEnricher;
+import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
+import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.listener.ConsumerProperties;
-import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
-import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.MessageBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
@SpringBootApplication
public class SplitterApplication
{
@Bean
- MessageChannel words(ChannelInterceptor keyInterceptor)
+ MessageProducer messageProducer(
+ KafkaMessageListenerContainer<String, String> messageListenerContainer)
{
- DirectChannel words = new DirectChannel();
- words.addInterceptor(keyInterceptor);
- return words;
+ KafkaMessageDrivenChannelAdapter<String, String> adapter =
+ new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
+ adapter.setOutputChannelName("kafka-in");
+ return adapter;
}
@Bean
- ChannelInterceptor keyInterceptor ()
+ @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
+ public HeaderEnricher kafkaInHeaderEnricher()
{
- return new ChannelInterceptor()
- {
- @Override
- public Message<?> preSend(Message<?> message, MessageChannel channel)
- {
- MessageHeaders headers = message.getHeaders();
- Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
- return
- MessageBuilder
- .fromMessage(message)
- .setHeader(KafkaHeaders.MESSAGE_KEY, key)
- .build();
- }
- };
+ Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+ Expression expression =
+ new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+ headersToAdd.put(
+ "key",
+ new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+ HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+ return enricher;
}
- @InboundChannelAdapter(channel = "recordings")
@Bean
- KafkaMessageSource<String, String> source(
- ConsumerFactory<String, String> cf,
+ KafkaMessageListenerContainer<String, String> messageListenerContainer(
+ ConsumerFactory<String,String> consumerFactory,
SplitterApplicationProperties properties)
{
- return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+ return
+ new KafkaMessageListenerContainer<>(
+ consumerFactory,
+ new ContainerProperties(properties.getInputTopic()));
+ }
+
+ @Bean
+ SubscribableKafkaChannel recordings(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ ChannelInterceptor messageKeyInterceptor)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
+ channel.setGroupId("recordings");
+ List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+ interceptors.add(messageKeyInterceptor);
+ channel.setInterceptors(interceptors);
+ return channel;
}
@Bean
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+ final ExpressionParser parser = new SpelExpressionParser();
+ Expression expression = parser.parseExpression("headers['key']");
+ handler.setMessageKeyExpression(expression);
return handler;
}
+ @Bean
+ SubscribableKafkaChannel words(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ ChannelInterceptor messageKeyInterceptor)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+ channel.setGroupId("words");
+ List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+ interceptors.add(messageKeyInterceptor);
+ channel.setInterceptors(interceptors);
+ return channel;
+ }
+
+ @Bean
+ MessageChannel messageLog()
+ {
+ return new DirectChannel();
+ }
+
+ @Bean
+ ChannelInterceptor messageKeyInterceptor()
+ {
+ return new ChannelInterceptor() {
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel)
+ {
+ String key = message.getHeaders().get("key", String.class);
+ return MessageBuilder
+ .fromMessage(message)
+ .setHeader("kafka_messageKey", key)
+ .build();
+ }
+ };
+ }
+
+ @GlobalChannelInterceptor
+ @Bean
+ ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
+ {
+ return new WireTap(messageLog);
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "messageLog")
+ public LoggingHandler logging()
+ {
+ LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
+ adapter.setLoggerName("MESSAGE_LOG");
+ adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
+ return adapter;
+ }
+
+
public static void main(String[] args)
{
SpringApplication.run(SplitterApplication.class, args);