From: Kai Moritz Date: Fri, 22 Sep 2023 16:20:31 +0000 (+0200) Subject: fix: `ConsumerTaskRunner` waits until the data-loading is finished X-Git-Tag: rebase--2024-02-20--15-07~40 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f604c5ad4ce13cc7ca90816a0ed58b4de4caeec6;p=demos%2Fkafka%2Fchat fix: `ConsumerTaskRunner` waits until the data-loading is finished --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java index c8600039..983ebd37 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -10,6 +10,7 @@ public class ConsumerTaskRunner { private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + private final InfoChannel infoChannel; public void executeConsumerTasks() { @@ -17,9 +18,14 @@ public class ConsumerTaskRunner dataChannelConsumerTaskExecutor.executeConsumerTask(); } - public void joinConsumerTasks() + public void joinConsumerTasks() throws InterruptedException { dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + while (infoChannel.loadInProgress()) + { + log.info("Waiting for {} to finish loading...", infoChannel); + Thread.sleep(1000); + } infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 722508bd..44f411f8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -28,7 +28,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner } @PreDestroy - public void joinConsumerTasks() + public void joinConsumerTasks() throws InterruptedException { consumerTaskRunner.joinConsumerTasks(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 77955168..cafc7757 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -39,11 +39,13 @@ public class KafkaServicesConfiguration @Bean ConsumerTaskRunner consumerTaskRunner( ConsumerTaskExecutor infoChannelConsumerTaskExecutor, - ConsumerTaskExecutor dataChannelConsumerTaskExecutor) + ConsumerTaskExecutor dataChannelConsumerTaskExecutor, + InfoChannel infoChannel) { return new ConsumerTaskRunner( infoChannelConsumerTaskExecutor, - dataChannelConsumerTaskExecutor); + dataChannelConsumerTaskExecutor, + infoChannel); } @Bean diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index d9ed8eb0..e01e012d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -53,7 +53,9 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT } @AfterAll - static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) + static void joinConsumerTasks( + @Autowired ConsumerTaskRunner consumerTaskRunner) + throws InterruptedException { KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 394ba1b3..e345a751 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -55,7 +55,9 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest } @AfterAll - static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) + static void joinConsumerTasks( + @Autowired ConsumerTaskRunner consumerTaskRunner) + throws InterruptedException { KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index c6163101..956d7cec 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -77,7 +77,7 @@ public class KafkaTestUtils new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) + public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException { consumerTaskRunner.joinConsumerTasks(); }