WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 22:40:45 +0000 (00:40 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 22:40:45 +0000 (00:40 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
new file mode 100644 (file)
index 0000000..afc5c2f
--- /dev/null
@@ -0,0 +1,34 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskRunner
+{
+  private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
+  private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+
+  private final InfoChannel infoChannel;
+
+  void run() throws InterruptedException
+  {
+    infoChannelConsumerTaskExecutor.executeConsumerTask();
+
+    while (infoChannel.loadInProgress())
+    {
+      log.info("InfoChannel is still loading...");
+      Thread.sleep(1000);
+    }
+
+    dataChannelConsumerTaskExecutor.executeConsumerTask();
+  }
+
+  void joinTasks()
+  {
+    dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+  }
+}
index 6810d06..936f81f 100644 (file)
@@ -1,9 +1,8 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
+import jakarta.annotation.PreDestroy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -19,72 +18,18 @@ import org.springframework.stereotype.Component;
 @Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
-  private final String infoTopic;
-  private final ThreadPoolTaskExecutor taskExecutor;
-  private final InfoChannel infoChannel;
-  private final DataChannel dataChannel;
-  private final Consumer<String, AbstractMessageTo> infoChannelConsumer;
-  private final Consumer<String, AbstractMessageTo> dataChannelConsumer;
-  private final WorkAssignor workAssignor;
-
-  CompletableFuture<Void> infoChannelConsumerJob;
-  CompletableFuture<Void> dataChannelConsumerJob;
->>>>>>> 7fb62d3 (WIP:ALIGN)
+  private final ConsumerTaskRunner consumerTaskRunner;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-<<<<<<< HEAD
-    chatRoomChannelTaskExecutor.executeConsumerTask();
-=======
-    List<TopicPartition> partitions = infoChannelConsumer
-        .partitionsFor(infoTopic)
-        .stream()
-        .map(partitionInfo -> new TopicPartition(
-            infoTopic,
-            partitionInfo.partition()))
-        .toList();
-    infoChannelConsumer.assign(partitions);
-    log.info("Starting the consumer for the InfoChannel");
-    infoChannelConsumerJob = taskExecutor
-        .submitCompletable(infoChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the InfoChannel exited abnormally!", e);
-          return null;
-        });
-
-    while (infoChannel.loadInProgress())
-    {
-      log.info("InfoChannel is still loading...");
-      Thread.sleep(1000);
-    }
-
-    workAssignor.assignWork(dataChannelConsumer);
-    log.info("Starting the consumer for the DataChannel");
-    dataChannelConsumerJob = taskExecutor
-        .submitCompletable(dataChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the DataChannel exited abnormally!", e);
-          return null;
-        });
+    consumerTaskRunner.run();
   }
 
   @PreDestroy
-  public void joinChatRoomChannelConsumerJob()
-  {
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    infoChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    dataChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
-  }
-
-
-  interface WorkAssignor
+  public void joinConsumerTasks()
   {
-    void assignWork(Consumer<?, ?> consumer);
+    consumerTaskRunner.joinTasks();
   }
 }
index 58a470d..997b5f1 100644 (file)
@@ -36,7 +36,19 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  ConsumerTaskExecutor infoChannelTaskExecutor(
+  ConsumerTaskRunner consumerTaskRunner(
+      ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+      InfoChannel infoChannel)
+  {
+    return new ConsumerTaskRunner(
+        infoChannelConsumerTaskExecutor,
+        dataChannelConsumerTaskExecutor,
+        infoChannel);
+  }
+
+  @Bean
+  ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       InfoChannel infoChannel,
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
@@ -50,7 +62,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  ConsumerTaskExecutor dataChannelTaskExecutor(
+  ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       DataChannel dataChannel,
       Consumer<String, AbstractMessageTo> dataChannelConsumer,