refactor: Separated channels for data and info -- Moved/copied classes
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index d63111a..cda0b94 100644 (file)
@@ -18,10 +18,12 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -34,7 +36,34 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  ChatHomeService kafkaChatHome(
+  ConsumerTaskExecutor chatRoomChannelTaskExecutor(
+      ThreadPoolTaskExecutor taskExecutor,
+      ChatRoomChannel chatRoomChannel,
+      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      ConsumerTaskExecutor.WorkAssignor workAssignor)
+  {
+    return new ConsumerTaskExecutor(
+        taskExecutor,
+        chatRoomChannel,
+        chatRoomChannelConsumer,
+        workAssignor);
+  }
+
+  @Bean
+  ConsumerTaskExecutor.WorkAssignor workAssignor(
+      ChatBackendProperties properties,
+      ChatRoomChannel chatRoomChannel)
+  {
+    return consumer ->
+    {
+      List<String> topics =
+          List.of(properties.getKafka().getChatRoomChannelTopic());
+      consumer.subscribe(topics, chatRoomChannel);
+    };
+  }
+
+  @Bean
+    ChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
       ChatRoomChannel chatRoomChannel)
   {