import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.messaging.MessageHandler;
import java.util.HashMap;
new ContainerProperties(properties.getInputTopic()));
}
+ @Bean
+ SubscribableKafkaChannel recordings(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-recordings");
+ channel.setGroupId("recordings");
+ return channel;
+ }
+
@Bean
@ServiceActivator(inputChannel = "words")
MessageHandler handler(
return handler;
}
+ @Bean
+ SubscribableKafkaChannel words(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory)
+ {
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, "channel-words");
+ channel.setGroupId("words");
+ return channel;
+ }
+
public static void main(String[] args)
{
SpringApplication.run(SplitterApplication.class, args);