import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ImportResource;
-import org.springframework.expression.Expression;
-import org.springframework.expression.ExpressionParser;
-import org.springframework.expression.common.LiteralExpression;
-import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.InboundChannelAdapter;
-import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
-import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
-import org.springframework.messaging.MessageHandler;
@SpringBootApplication
return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
}
- @Bean
- @ServiceActivator(inputChannel = "words")
- MessageHandler handler(
- KafkaTemplate<String, String> kafkaTemplate,
- SplitterApplicationProperties properties)
- {
- KafkaProducerMessageHandler<String, String> handler =
- new KafkaProducerMessageHandler<>(kafkaTemplate);
- handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
- final ExpressionParser parser = new SpelExpressionParser();
- Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
- handler.setMessageKeyExpression(expression);
- return handler;
- }
-
public static void main(String[] args)
{
SpringApplication.run(SplitterApplication.class, args);
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
+ <int-kafka:inbound-channel-adapter
+ id="recordings"
+ consumer-factory="kafkaConsumerFactory"
+ payload-type="java.lang.String"
+ group-id="splitter"
+ channel="recordings" />
+
</beans>