From 2cab41c3879f59c8e92312992047fcad7d3f9291 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Sep 2023 18:20:31 +0200 Subject: [PATCH] WIP --- .../backend/implementation/kafka/ConsumerTaskRunner.java | 8 +++++++- .../kafka/KafkaServicesApplicationRunner.java | 2 +- .../de/juplo/kafka/chat/backend/KafkaConfigurationIT.java | 4 +++- .../implementation/kafka/KafkaChatHomeServiceTest.java | 4 +++- .../chat/backend/implementation/kafka/KafkaTestUtils.java | 2 +- 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java index c8600039..983ebd37 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -10,6 +10,7 @@ public class ConsumerTaskRunner { private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + private final InfoChannel infoChannel; public void executeConsumerTasks() { @@ -17,9 +18,14 @@ public class ConsumerTaskRunner dataChannelConsumerTaskExecutor.executeConsumerTask(); } - public void joinConsumerTasks() + public void joinConsumerTasks() throws InterruptedException { dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + while (infoChannel.loadInProgress()) + { + log.info("Waiting for {} to finish loading...", infoChannel); + Thread.sleep(1000); + } infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 722508bd..44f411f8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -28,7 +28,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner } @PreDestroy - public void joinConsumerTasks() + public void joinConsumerTasks() throws InterruptedException { consumerTaskRunner.joinConsumerTasks(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index d9ed8eb0..e01e012d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -53,7 +53,9 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT } @AfterAll - static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) + static void joinConsumerTasks( + @Autowired ConsumerTaskRunner consumerTaskRunner) + throws InterruptedException { KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 394ba1b3..e345a751 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -55,7 +55,9 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest } @AfterAll - static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) + static void joinConsumerTasks( + @Autowired ConsumerTaskRunner consumerTaskRunner) + throws InterruptedException { KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index c6163101..956d7cec 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -77,7 +77,7 @@ public class KafkaTestUtils new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) + public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException { consumerTaskRunner.joinConsumerTasks(); } -- 2.20.1