fix: `ConsumerTaskRunner` waits until the data-loading is finished
authorKai Moritz <kai@juplo.de>
Fri, 22 Sep 2023 16:20:31 +0000 (18:20 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 28 Jan 2024 17:59:08 +0000 (18:59 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

index c860003..983ebd3 100644 (file)
@@ -10,6 +10,7 @@ public class ConsumerTaskRunner
 {
   private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
   private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+  private final InfoChannel infoChannel;
 
   public void executeConsumerTasks()
   {
@@ -17,9 +18,14 @@ public class ConsumerTaskRunner
     dataChannelConsumerTaskExecutor.executeConsumerTask();
   }
 
-  public void joinConsumerTasks()
+  public void joinConsumerTasks() throws InterruptedException
   {
     dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    while (infoChannel.loadInProgress())
+    {
+      log.info("Waiting for {} to finish loading...", infoChannel);
+      Thread.sleep(1000);
+    }
     infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
   }
 }
index 722508b..44f411f 100644 (file)
@@ -28,7 +28,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   }
 
   @PreDestroy
-  public void joinConsumerTasks()
+  public void joinConsumerTasks() throws InterruptedException
   {
     consumerTaskRunner.joinConsumerTasks();
   }
index 7795516..cafc775 100644 (file)
@@ -39,11 +39,13 @@ public class KafkaServicesConfiguration
   @Bean
   ConsumerTaskRunner consumerTaskRunner(
       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
-      ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+      InfoChannel infoChannel)
   {
     return new ConsumerTaskRunner(
         infoChannelConsumerTaskExecutor,
-        dataChannelConsumerTaskExecutor);
+        dataChannelConsumerTaskExecutor,
+        infoChannel);
   }
 
   @Bean
index d9ed8eb..e01e012 100644 (file)
@@ -53,7 +53,9 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   }
 
   @AfterAll
-  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+  static void joinConsumerTasks(
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      throws InterruptedException
   {
     KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
   }
index 394ba1b..e345a75 100644 (file)
@@ -55,7 +55,9 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   }
 
   @AfterAll
-  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+  static void joinConsumerTasks(
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      throws InterruptedException
   {
     KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
   }
index c616310..956d7ce 100644 (file)
@@ -77,7 +77,7 @@ public class KafkaTestUtils
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
 
-  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
+  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
   {
     consumerTaskRunner.joinConsumerTasks();
   }