import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ImportResource;
import org.springframework.expression.common.LiteralExpression;
-import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.support.MessageBuilder;
-import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.support.KafkaHeaders;
};
}
- @InboundChannelAdapter(channel = "recordings")
@Bean
- KafkaMessageSource<String, String> source(
- ConsumerFactory<String, String> cf,
- SplitterApplicationProperties properties)
+ ConsumerProperties consumerProperties(SplitterApplicationProperties properties)
{
- return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+ return new ConsumerProperties(properties.getInputTopic());
}
@Bean
</int:interceptors>
</int:channel>
+ <int-kafka:inbound-channel-adapter
+ id="recordings"
+ consumer-factory="kafkaConsumerFactory"
+ payload-type="java.lang.String"
+ group-id="splitter"
+ channel="recordings" />
+
</beans>