}
@Bean
- @Transformer(inputChannel = "words", outputChannel = "kafka-out")
- public HeaderEnricher kafkaOutHeaderEnricher()
- {
- Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
- Expression expression =
- new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
- headersToAdd.put(
- "key",
- new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
- HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
- return enricher;
- }
-
- @Bean
- @ServiceActivator(inputChannel = "kafka-out")
+ @ServiceActivator(inputChannel = "words")
MessageHandler handler(
KafkaTemplate<String, String> kafkaTemplate,
SplitterApplicationProperties properties)
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
final ExpressionParser parser = new SpelExpressionParser();
- Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
+ Expression expression = parser.parseExpression("headers['key']");
handler.setMessageKeyExpression(expression);
return handler;
}