- MessageChannel words()
- {
- DirectChannel words = new DirectChannel();
- words.addInterceptor(new ChannelInterceptor()
- {
- @Override
- public Message<?> preSend(Message<?> message, MessageChannel channel)
- {
- MessageHeaders headers = message.getHeaders();
- Object key = headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
- return
- MessageBuilder
- .fromMessage(message)
- .setHeader(KafkaHeaders.MESSAGE_KEY, key)
- .build();
- }
- });
- return words;
- }
-
- @InboundChannelAdapter(channel = "recordings")
- @Bean
- KafkaMessageSource<String, String> source(
- ConsumerFactory<String, String> cf,
- SplitterApplicationProperties properties)
+ SubscribableChannel recordings(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ SplitterApplicationProperties properties,
+ @Value("${spring.kafka.consumer.group-id}") String groupId)