+ KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
+ {
+ return new KafkaServicesThreadPoolTaskExecutorCustomizer();
+ }
+
+ @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
+ ChannelTaskExecutor infoChannelTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ InfoChannel infoChannel,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ WorkAssignor infoChannelWorkAssignor)
+ {
+ return new ChannelTaskExecutor(
+ taskExecutor,
+ infoChannel,
+ infoChannelConsumer,
+ infoChannelWorkAssignor);
+ }
+
+ @Bean
+ WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
+ {
+ return consumer ->
+ {
+ String topic = properties.getKafka().getInfoChannelTopic();
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(partitionInfo ->
+ new TopicPartition(topic, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
+ }
+
+ @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
+ ChannelTaskExecutor dataChannelTaskExecutor(