import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.integration.transformer.HeaderEnricher;
+import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
+import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.messaging.MessageHandler;
+import java.util.HashMap;
+import java.util.Map;
+
@SpringBootApplication
@EnableConfigurationProperties(SplitterApplicationProperties.class)
{
KafkaMessageDrivenChannelAdapter<String, String> adapter =
new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
- adapter.setOutputChannelName("recordings");
+ adapter.setOutputChannelName("kafka-in");
return adapter;
}
+ @Bean
+ @Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
+ public HeaderEnricher kafkaInHeaderEnricher()
+ {
+ Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+ Expression expression =
+ new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+ headersToAdd.put(
+ "key",
+ new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+ HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+ return enricher;
+ }
+
@Bean
KafkaMessageListenerContainer<String, String> messageListenerContainer(
ConsumerFactory<String,String> consumerFactory,
new ContainerProperties(properties.getInputTopic()));
}
+ @Bean
+ SubscribableKafkaChannel recordings(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
+ channel.setGroupId("recordings");
+ return channel;
+ }
+
@Bean
@ServiceActivator(inputChannel = "words")
MessageHandler handler(
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
final ExpressionParser parser = new SpelExpressionParser();
- Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
+ Expression expression = parser.parseExpression("headers['key']");
handler.setMessageKeyExpression(expression);
return handler;
}
+ @Bean
+ SubscribableKafkaChannel words(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+ channel.setGroupId("words");
+ return channel;
+ }
+
public static void main(String[] args)
{
SpringApplication.run(SplitterApplication.class, args);