From a4c69f2736204751a70e916daf451ed6eb7b2994 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Sep 2023 18:20:31 +0200 Subject: [PATCH] fix: `ConsumerTaskRunner` waits until the data-loading is finished --- .../backend/implementation/kafka/ConsumerTaskRunner.java | 8 +++++++- .../kafka/KafkaServicesApplicationRunner.java | 2 +- .../implementation/kafka/KafkaServicesConfiguration.java | 6 ++++-- .../de/juplo/kafka/chat/backend/KafkaConfigurationIT.java | 4 +++- .../implementation/kafka/KafkaChatHomeServiceTest.java | 4 +++- .../chat/backend/implementation/kafka/KafkaTestUtils.java | 2 +- 6 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java index c8600039..983ebd37 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -10,6 +10,7 @@ public class ConsumerTaskRunner { private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + private final InfoChannel infoChannel; public void executeConsumerTasks() { @@ -17,9 +18,14 @@ public class ConsumerTaskRunner dataChannelConsumerTaskExecutor.executeConsumerTask(); } - public void joinConsumerTasks() + public void joinConsumerTasks() throws InterruptedException { dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + while (infoChannel.loadInProgress()) + { + log.info("Waiting for {} to finish loading...", infoChannel); + Thread.sleep(1000); + } infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 722508bd..44f411f8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -28,7 +28,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner } @PreDestroy - public void joinConsumerTasks() + public void joinConsumerTasks() throws InterruptedException { consumerTaskRunner.joinConsumerTasks(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 77955168..cafc7757 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -39,11 +39,13 @@ public class KafkaServicesConfiguration @Bean ConsumerTaskRunner consumerTaskRunner( ConsumerTaskExecutor infoChannelConsumerTaskExecutor, - ConsumerTaskExecutor dataChannelConsumerTaskExecutor) + ConsumerTaskExecutor dataChannelConsumerTaskExecutor, + InfoChannel infoChannel) { return new ConsumerTaskRunner( infoChannelConsumerTaskExecutor, - dataChannelConsumerTaskExecutor); + dataChannelConsumerTaskExecutor, + infoChannel); } @Bean diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index d9ed8eb0..e01e012d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -53,7 +53,9 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT } @AfterAll - static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) + static void joinConsumerTasks( + @Autowired ConsumerTaskRunner consumerTaskRunner) + throws InterruptedException { KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 394ba1b3..e345a751 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -55,7 +55,9 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest } @AfterAll - static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) + static void joinConsumerTasks( + @Autowired ConsumerTaskRunner consumerTaskRunner) + throws InterruptedException { KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index c6163101..956d7cec 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -77,7 +77,7 @@ public class KafkaTestUtils new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) + public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException { consumerTaskRunner.joinConsumerTasks(); } -- 2.20.1