+ ConsumerTaskExecutor chatRoomChannelTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ ChatRoomChannel chatRoomChannel,
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+ ConsumerTaskExecutor.WorkAssignor workAssignor)
+ {
+ return new ConsumerTaskExecutor(
+ taskExecutor,
+ chatRoomChannel,
+ chatRoomChannelConsumer,
+ workAssignor);
+ }
+
+ @Bean
+ ConsumerTaskExecutor.WorkAssignor workAssignor(
+ ChatBackendProperties properties,
+ ChatRoomChannel chatRoomChannel)
+ {
+ return consumer ->
+ {
+ List<String> topics =
+ List.of(properties.getKafka().getChatRoomChannelTopic());
+ consumer.subscribe(topics, chatRoomChannel);
+ };
+ }
+
+ @Bean
+ ChatHomeService kafkaChatHome(