import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
-import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.messaging.MessageHandler;
@EnableIntegration
public class SplitterApplication
{
- @InboundChannelAdapter(channel = "recordings")
@Bean
- KafkaMessageSource<String, String> source(
- ConsumerFactory<String, String> cf,
+ MessageProducer messageProducer(
+ KafkaMessageListenerContainer<String, String> messageListenerContainer)
+ {
+ KafkaMessageDrivenChannelAdapter<String, String> adapter =
+ new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
+ adapter.setOutputChannelName("recordings");
+ return adapter;
+ }
+
+ @Bean
+ KafkaMessageListenerContainer<String, String> messageListenerContainer(
+ ConsumerFactory<String,String> consumerFactory,
SplitterApplicationProperties properties)
{
- return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+ return
+ new KafkaMessageListenerContainer<>(
+ consumerFactory,
+ new ContainerProperties(properties.getInputTopic()));
}
@Bean