import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
+import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
+import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.messaging.MessageHandler;
@Bean
SubscribableKafkaChannel recordings(
KafkaTemplate<String, String> kafkaTemplate,
- KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
- SplitterApplicationProperties properties,
- @Value("${spring.kafka.consumer.group-id}") String groupId)
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
{
SubscribableKafkaChannel channel =
- new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic());
- channel.setGroupId(groupId);
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
+ channel.setGroupId("recordings");
return channel;
}
+ @InboundChannelAdapter(channel = "recordings")
+ @Bean
+ KafkaMessageSource<String, String> source(
+ ConsumerFactory<String, String> cf,
+ SplitterApplicationProperties properties)
+ {
+ return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+ }
+
@Bean
@ServiceActivator(inputChannel = "words")
MessageHandler handler(
return handler;
}
+ @Bean
+ SubscribableKafkaChannel words(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+ channel.setGroupId("words");
+ return channel;
+ }
+
public static void main(String[] args)
{
SpringApplication.run(SplitterApplication.class, args);