+ ConsumerTaskRunner consumerTaskRunner(
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+ {
+ return new ConsumerTaskRunner(
+ infoChannelConsumerTaskExecutor,
+ dataChannelConsumerTaskExecutor);
+ }
+
+ @Bean
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ InfoChannel infoChannel,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ WorkAssignor infoChannelWorkAssignor)
+ {
+ return new ConsumerTaskExecutor(
+ taskExecutor,
+ infoChannel,
+ infoChannelConsumer,
+ infoChannelWorkAssignor);
+ }
+
+ @Bean
+ WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
+ {
+ return consumer ->
+ {
+ String topic = properties.getKafka().getInfoChannelTopic();
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(partitionInfo ->
+ new TopicPartition(topic, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
+ }
+
+ @Bean
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor(