From 7d9620a27bd4cd8dfb50d8c853f0709c8a55ba16 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Fri, 22 Sep 2023 18:20:31 +0200
Subject: [PATCH] fix: `ConsumerTaskRunner` waits until the data-loading is
 finished

---
 .../backend/implementation/kafka/ConsumerTaskRunner.java  | 8 +++++++-
 .../kafka/KafkaServicesApplicationRunner.java             | 2 +-
 .../implementation/kafka/KafkaServicesConfiguration.java  | 6 ++++--
 .../de/juplo/kafka/chat/backend/KafkaConfigurationIT.java | 4 +++-
 .../implementation/kafka/KafkaChatHomeServiceTest.java    | 4 +++-
 .../chat/backend/implementation/kafka/KafkaTestUtils.java | 2 +-
 6 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
index c8600039..983ebd37 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
@@ -10,6 +10,7 @@ public class ConsumerTaskRunner
 {
   private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
   private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+  private final InfoChannel infoChannel;
 
   public void executeConsumerTasks()
   {
@@ -17,9 +18,14 @@ public class ConsumerTaskRunner
     dataChannelConsumerTaskExecutor.executeConsumerTask();
   }
 
-  public void joinConsumerTasks()
+  public void joinConsumerTasks() throws InterruptedException
   {
     dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    while (infoChannel.loadInProgress())
+    {
+      log.info("Waiting for {} to finish loading...", infoChannel);
+      Thread.sleep(1000);
+    }
     infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
index 722508bd..44f411f8 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
@@ -28,7 +28,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   }
 
   @PreDestroy
-  public void joinConsumerTasks()
+  public void joinConsumerTasks() throws InterruptedException
   {
     consumerTaskRunner.joinConsumerTasks();
   }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
index 77955168..cafc7757 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
@@ -39,11 +39,13 @@ public class KafkaServicesConfiguration
   @Bean
   ConsumerTaskRunner consumerTaskRunner(
       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
-      ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+      InfoChannel infoChannel)
   {
     return new ConsumerTaskRunner(
         infoChannelConsumerTaskExecutor,
-        dataChannelConsumerTaskExecutor);
+        dataChannelConsumerTaskExecutor,
+        infoChannel);
   }
 
   @Bean
diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
index d9ed8eb0..e01e012d 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
@@ -53,7 +53,9 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   }
 
   @AfterAll
-  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+  static void joinConsumerTasks(
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      throws InterruptedException
   {
     KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
   }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
index 394ba1b3..e345a751 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
@@ -55,7 +55,9 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   }
 
   @AfterAll
-  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+  static void joinConsumerTasks(
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      throws InterruptedException
   {
     KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
   }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
index c6163101..956d7cec 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
@@ -77,7 +77,7 @@ public class KafkaTestUtils
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
 
-  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
+  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
   {
     consumerTaskRunner.joinConsumerTasks();
   }
-- 
2.20.1