+ KafkaMessageDrivenChannelAdapter<String, String> adapter =
+ new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
+ adapter.setOutputChannelName("kafka-in");
+ return adapter;
+ }
+
+ @Bean
+ @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
+ public HeaderEnricher kafkaInHeaderEnricher()
+ {
+ Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+ Expression expression =
+ new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+ headersToAdd.put(
+ "key",
+ new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+ HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+ return enricher;
+ }
+
+ @Bean
+ KafkaMessageListenerContainer<String, String> messageListenerContainer(
+ ConsumerFactory<String,String> consumerFactory,
+ SplitterApplicationProperties properties)
+ {
+ return
+ new KafkaMessageListenerContainer<>(
+ consumerFactory,
+ new ContainerProperties(properties.getInputTopic()));
+ }
+
+ @Bean
+ SubscribableKafkaChannel recordings(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ ChannelInterceptor messageKeyInterceptor)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
+ channel.setGroupId("recordings");
+ List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+ interceptors.add(messageKeyInterceptor);
+ channel.setInterceptors(interceptors);
+ return channel;
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "words")
+ MessageHandler handler(
+ KafkaTemplate<String, String> kafkaTemplate,
+ SplitterApplicationProperties properties)
+ {
+ KafkaProducerMessageHandler<String, String> handler =
+ new KafkaProducerMessageHandler<>(kafkaTemplate);
+ handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+ final ExpressionParser parser = new SpelExpressionParser();
+ Expression expression = parser.parseExpression("headers['key']");
+ handler.setMessageKeyExpression(expression);
+ return handler;
+ }
+
+ @Bean
+ SubscribableKafkaChannel words(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ ChannelInterceptor messageKeyInterceptor)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+ channel.setGroupId("words");
+ List<ChannelInterceptor> interceptors = new ArrayList<>(1);
+ interceptors.add(messageKeyInterceptor);
+ channel.setInterceptors(interceptors);
+ return channel;
+ }
+
+ @Bean
+ MessageChannel messageLog()
+ {
+ return new DirectChannel();
+ }
+
+ @Bean
+ ChannelInterceptor messageKeyInterceptor()
+ {
+ return new ChannelInterceptor() {
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel)
+ {
+ String key = message.getHeaders().get("key", String.class);
+ return MessageBuilder
+ .fromMessage(message)
+ .setHeader("kafka_messageKey", key)
+ .build();
+ }
+ };
+ }
+
+ @GlobalChannelInterceptor
+ @Bean
+ ChannelInterceptor globalLoggingWireTap(MessageChannel messageLog)
+ {
+ return new WireTap(messageLog);
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "messageLog")
+ public LoggingHandler logging()
+ {
+ LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
+ adapter.setLoggerName("MESSAGE_LOG");
+ adapter.setLogExpressionString("headers.id + ': ' + payload + ', headers=' + headers");
+ return adapter;