+ ConsumerTaskRunner consumerTaskRunner(
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+ InfoChannel infoChannel)
+ {
+ return new ConsumerTaskRunner(
+ infoChannelConsumerTaskExecutor,
+ dataChannelConsumerTaskExecutor,
+ infoChannel);
+ }
+
+ @Bean
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ InfoChannel infoChannel,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ WorkAssignor infoChannelWorkAssignor)
+ {
+ return new ConsumerTaskExecutor(
+ taskExecutor,
+ infoChannel,
+ infoChannelConsumer,
+ infoChannelWorkAssignor);
+ }
+
+ @Bean
+ WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
+ {
+ return consumer ->
+ {
+ String topic = properties.getKafka().getInfoChannelTopic();
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(partitionInfo ->
+ new TopicPartition(topic, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
+ }
+
+ @Bean
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ DataChannel dataChannel,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
+ WorkAssignor dataChannelWorkAssignor)
+ {
+ return new ConsumerTaskExecutor(
+ taskExecutor,
+ dataChannel,
+ dataChannelConsumer,
+ dataChannelWorkAssignor);
+ }
+
+ @Bean
+ WorkAssignor dataChannelWorkAssignor(
+ ChatBackendProperties properties,
+ DataChannel dataChannel)
+ {
+ return consumer ->
+ {
+ List<String> topics =
+ List.of(properties.getKafka().getDataChannelTopic());
+ consumer.subscribe(topics, dataChannel);
+ };
+ }
+
+ @Bean
+ ChatHomeService kafkaChatHome(