X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplication.java;h=cfe7a4ade185239abc74c6f3d1c9efbfbdc08a29;hb=bc490d203bb6fbe99c397310d0aa1d3081390f03;hp=246fffd66a37442954eb689ace8c86cd1dd34736;hpb=9f82f66f1a9ed3194889fee1b69a10287c4b03db;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 246fffd..cfe7a4a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -8,15 +8,36 @@ import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.annotation.Transformer; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.interceptor.WireTap; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.kafka.inbound.KafkaMessageSource; +import org.springframework.integration.config.GlobalChannelInterceptor; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.handler.LoggingHandler; +import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; +import org.springframework.integration.transformer.HeaderEnricher; +import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor; +import org.springframework.integration.transformer.support.HeaderValueMessageProcessor; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.ConsumerProperties; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.messaging.support.MessageBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; @SpringBootApplication @@ -24,13 +45,54 @@ import org.springframework.messaging.MessageHandler; @EnableIntegration public class SplitterApplication { - @InboundChannelAdapter(channel = "recordings") @Bean - KafkaMessageSource source( - ConsumerFactory cf, + MessageProducer messageProducer( + KafkaMessageListenerContainer messageListenerContainer) + { + KafkaMessageDrivenChannelAdapter adapter = + new KafkaMessageDrivenChannelAdapter(messageListenerContainer); + adapter.setOutputChannelName("kafka-in"); + return adapter; + } + + @Bean + @Transformer(inputChannel = "kafka-in", outputChannel = "recordings") + public HeaderEnricher kafkaInHeaderEnricher() + { + Map> headersToAdd = new HashMap<>(); + Expression expression = + new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']"); + headersToAdd.put( + "key", + new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); + HeaderEnricher enricher = new HeaderEnricher(headersToAdd); + return enricher; + } + + @Bean + KafkaMessageListenerContainer messageListenerContainer( + ConsumerFactory consumerFactory, SplitterApplicationProperties properties) { - return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); + return + new KafkaMessageListenerContainer<>( + consumerFactory, + new ContainerProperties(properties.getInputTopic())); + } + + @Bean + SubscribableKafkaChannel recordings( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory, + ChannelInterceptor messageKeyInterceptor) + { + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings"); + channel.setGroupId("recordings"); + List interceptors = new ArrayList<>(1); + interceptors.add(messageKeyInterceptor); + channel.setInterceptors(interceptors); + return channel; } @Bean @@ -43,11 +105,66 @@ public class SplitterApplication new KafkaProducerMessageHandler<>(kafkaTemplate); handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); final ExpressionParser parser = new SpelExpressionParser(); - Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']"); + Expression expression = parser.parseExpression("headers['key']"); handler.setMessageKeyExpression(expression); return handler; } + @Bean + SubscribableKafkaChannel words( + KafkaTemplate kafkaTemplate, + KafkaListenerContainerFactory> containerFactory, + ChannelInterceptor messageKeyInterceptor) + { + SubscribableKafkaChannel channel = + new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words"); + channel.setGroupId("words"); + List interceptors = new ArrayList<>(1); + interceptors.add(messageKeyInterceptor); + channel.setInterceptors(interceptors); + return channel; + } + + @Bean + MessageChannel messageLog() + { + return new DirectChannel(); + } + + @Bean + ChannelInterceptor messageKeyInterceptor() + { + return new ChannelInterceptor() { + @Override + public Message preSend(Message message, MessageChannel channel) + { + String key = message.getHeaders().get("key", String.class); + return MessageBuilder + .fromMessage(message) + .setHeader("kafka_messageKey", key) + .build(); + } + }; + } + + @GlobalChannelInterceptor + @Bean + ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog) + { + return new WireTap(messageLog); + } + + @Bean + @ServiceActivator(inputChannel = "messageLog") + public LoggingHandler logging() + { + LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG); + adapter.setLoggerName("MESSAGE_LOG"); + adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers"); + return adapter; + } + + public static void main(String[] args) { SpringApplication.run(SplitterApplication.class, args);