From bf6bf0aac726c45893a0977e503b271cc18b9ccc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 4 Mar 2024 15:49:08 +0100 Subject: [PATCH] WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN --- .../backend/implementation/kafka/ChannelTaskRunner.java | 2 +- .../kafka/KafkaServicesApplicationRunner.java | 2 +- .../de/juplo/kafka/chat/backend/KafkaConfigurationIT.java | 2 +- .../implementation/kafka/KafkaChatHomeServiceTest.java | 7 ++++++- .../chat/backend/implementation/kafka/KafkaTestUtils.java | 5 ----- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java index 9e8576d0..0cbc06bb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java @@ -17,7 +17,7 @@ public class ChannelTaskRunner dataChannelTaskExecutor.executeConsumerTask(); } - public void joinChannel() throws InterruptedException + public void joinChannels() throws InterruptedException { joinChannel(dataChannelTaskExecutor); joinChannel(infoChannelTaskExecutor); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 2cb3971b..97ecea2f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -28,6 +28,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinConsumerTasks() throws InterruptedException { - channelTaskRunner.joinChannel(); + channelTaskRunner.joinChannels(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index e54fac4e..86cfa301 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -57,7 +57,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @Autowired ChannelTaskRunner channelTaskRunner) throws InterruptedException { - KafkaTestUtils.joinConsumerTasks(channelTaskRunner); + channelTaskRunner.joinChannels(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 2ec28265..ab1cce9c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; @@ -55,9 +56,13 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @AfterAll static void joinConsumerTasks( + @Autowired Consumer dataChannelConsumer, + @Autowired Consumer infoChannelConsumer, @Autowired ChannelTaskRunner channelTaskRunner) throws InterruptedException { - KafkaTestUtils.joinConsumerTasks(channelTaskRunner); + dataChannelConsumer.wakeup(); + infoChannelConsumer.wakeup(); + channelTaskRunner.joinChannels(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index ca30f667..1da6158b 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -71,9 +71,4 @@ public abstract class KafkaTestUtils value, new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - - public static void joinConsumerTasks(ChannelTaskRunner channelTaskRunner) throws InterruptedException - { - channelTaskRunner.joinChannel(); - } } -- 2.20.1