@Bean
@Transformer(inputChannel = "kafka-in", outputChannel = "recordings")
- public HeaderEnricher headerEnricher()
+ public HeaderEnricher kafkaInHeaderEnricher()
{
Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
Expression expression =
}
@Bean
- @ServiceActivator(inputChannel = "words")
+ @Transformer(inputChannel = "words", outputChannel = "kafka-out")
+ public HeaderEnricher kafkaOutHeaderEnricher()
+ {
+ Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
+ Expression expression =
+ new SpelExpressionParser().parseExpression("headers['kafka_receivedMessageKey']");
+ headersToAdd.put(
+ "key",
+ new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
+ HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
+ return enricher;
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "kafka-out")
MessageHandler handler(
KafkaTemplate<String, String> kafkaTemplate,
SplitterApplicationProperties properties)