fix: `ConsumerTaskRunner` waits until the data-loading is finished
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index c4479ce..cafc775 100644 (file)
@@ -1,16 +1,17 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
+package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -18,10 +19,12 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -34,35 +37,121 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  ChatHomeService kafkaChatHome(
+  ConsumerTaskRunner consumerTaskRunner(
+      ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+      InfoChannel infoChannel)
+  {
+    return new ConsumerTaskRunner(
+        infoChannelConsumerTaskExecutor,
+        dataChannelConsumerTaskExecutor,
+        infoChannel);
+  }
+
+  @Bean
+  ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
+      ThreadPoolTaskExecutor taskExecutor,
+      InfoChannel infoChannel,
+      Consumer<String, AbstractMessageTo> infoChannelConsumer,
+      WorkAssignor infoChannelWorkAssignor)
+  {
+    return new ConsumerTaskExecutor(
+        taskExecutor,
+        infoChannel,
+        infoChannelConsumer,
+        infoChannelWorkAssignor);
+  }
+
+  @Bean
+  WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
+  {
+    return consumer ->
+    {
+      String topic = properties.getKafka().getInfoChannelTopic();
+      List<TopicPartition> partitions = consumer
+          .partitionsFor(topic)
+          .stream()
+          .map(partitionInfo ->
+              new TopicPartition(topic, partitionInfo.partition()))
+          .toList();
+      consumer.assign(partitions);
+    };
+  }
+
+  @Bean
+  ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+      ThreadPoolTaskExecutor taskExecutor,
+      DataChannel dataChannel,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
+      WorkAssignor dataChannelWorkAssignor)
+  {
+    return new ConsumerTaskExecutor(
+        taskExecutor,
+        dataChannel,
+        dataChannelConsumer,
+        dataChannelWorkAssignor);
+  }
+
+  @Bean
+  WorkAssignor dataChannelWorkAssignor(
       ChatBackendProperties properties,
-      ChatRoomChannel chatRoomChannel)
+      DataChannel dataChannel)
+  {
+    return consumer ->
+    {
+      List<String> topics =
+          List.of(properties.getKafka().getDataChannelTopic());
+      consumer.subscribe(topics, dataChannel);
+    };
+  }
+
+  @Bean
+    ChatHomeService kafkaChatHome(
+      ChatBackendProperties properties,
+      InfoChannel infoChannel,
+      DataChannel dataChannel)
   {
     return new KafkaChatHomeService(
         properties.getKafka().getNumPartitions(),
-        chatRoomChannel);
+        infoChannel,
+        dataChannel);
+  }
+
+  @Bean
+  InfoChannel infoChannel(
+      ChatBackendProperties properties,
+      Producer<String, AbstractMessageTo> producer,
+      Consumer<String, AbstractMessageTo> infoChannelConsumer)
+  {
+    return new InfoChannel(
+        properties.getKafka().getInfoChannelTopic(),
+        producer,
+        infoChannelConsumer,
+        properties.getKafka().getInstanceUri());
   }
 
   @Bean
-  ChatRoomChannel chatRoomChannel(
+  DataChannel dataChannel(
       ChatBackendProperties properties,
-      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      Producer<String, AbstractMessageTo> producer,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
-      Clock clock)
+      Clock clock,
+      InfoChannel infoChannel)
   {
-    return new ChatRoomChannel(
-        properties.getKafka().getChatRoomChannelTopic(),
-        chatRoomChannelProducer,
-        chatRoomChannelConsumer,
+    return new DataChannel(
+        properties.getKafka().getDataChannelTopic(),
+        producer,
+        dataChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
-        clock);
+        clock,
+        infoChannel);
   }
 
   @Bean
-  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
+  Producer<String, AbstractMessageTo>  producer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
@@ -72,7 +161,7 @@ public class KafkaServicesConfiguration
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
     return new KafkaProducer<>(
         properties,
         stringSerializer,
@@ -97,7 +186,28 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
+  Consumer<String, AbstractMessageTo>  infoChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<AbstractMessageTo> messageDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "info_channel");
+    return new KafkaConsumer<>(
+        properties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  Consumer<String, AbstractMessageTo>  dataChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
@@ -107,10 +217,10 @@ public class KafkaServicesConfiguration
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
-        "chatroom_channel");
+        "data_channel");
     return new KafkaConsumer<>(
         properties,
         stringDeserializer,
@@ -139,7 +249,7 @@ public class KafkaServicesConfiguration
   String typeMappings ()
   {
     return
-        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+        "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
   }