]> juplo.de Git - demos/kafka/chat/commitdiff
fix: `ConsumerTaskRunner` waits until the data-loading is finished
authorKai Moritz <kai@juplo.de>
Fri, 22 Sep 2023 16:20:31 +0000 (18:20 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 28 Jan 2024 17:51:39 +0000 (18:51 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

index c860003930bd320adc11f00eaf9b38c84be63cab..983ebd374670075c214e6b9e9246d005fd97abe2 100644 (file)
@@ -10,6 +10,7 @@ public class ConsumerTaskRunner
 {
   private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
   private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+  private final InfoChannel infoChannel;
 
   public void executeConsumerTasks()
   {
@@ -17,9 +18,14 @@ public class ConsumerTaskRunner
     dataChannelConsumerTaskExecutor.executeConsumerTask();
   }
 
-  public void joinConsumerTasks()
+  public void joinConsumerTasks() throws InterruptedException
   {
     dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    while (infoChannel.loadInProgress())
+    {
+      log.info("Waiting for {} to finish loading...", infoChannel);
+      Thread.sleep(1000);
+    }
     infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
   }
 }
index 722508bd596587cbc11d603821448a68ca8aaa48..44f411f80ab4d018c325eebf729bafdebdf7e565 100644 (file)
@@ -28,7 +28,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   }
 
   @PreDestroy
-  public void joinConsumerTasks()
+  public void joinConsumerTasks() throws InterruptedException
   {
     consumerTaskRunner.joinConsumerTasks();
   }
index 77955168787b9d5d607edd2cf41683fcd0393b0c..cafc775760f27ad4b672f86f3c49d3ffd868bdbb 100644 (file)
@@ -39,11 +39,13 @@ public class KafkaServicesConfiguration
   @Bean
   ConsumerTaskRunner consumerTaskRunner(
       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
-      ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+      InfoChannel infoChannel)
   {
     return new ConsumerTaskRunner(
         infoChannelConsumerTaskExecutor,
-        dataChannelConsumerTaskExecutor);
+        dataChannelConsumerTaskExecutor,
+        infoChannel);
   }
 
   @Bean
index d9ed8eb09289c79366bc0ba7e8f428ea280f9e38..e01e012d1ba2dd7d8f26fbccafde5a7959bccfd5 100644 (file)
@@ -53,7 +53,9 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   }
 
   @AfterAll
-  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+  static void joinConsumerTasks(
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      throws InterruptedException
   {
     KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
   }
index 394ba1b3364f64b509a932ec418742b40d0cccfa..e345a751f73e16bff82264971ebe509cd8cb817f 100644 (file)
@@ -55,7 +55,9 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   }
 
   @AfterAll
-  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+  static void joinConsumerTasks(
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      throws InterruptedException
   {
     KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
   }
index c6163101814123c0482409c3a1c9235dfd373e71..956d7cecf917034d9a0415af421cb207da244f1e 100644 (file)
@@ -77,7 +77,7 @@ public class KafkaTestUtils
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
 
-  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
+  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
   {
     consumerTaskRunner.joinConsumerTasks();
   }