From: Kai Moritz Date: Wed, 6 Mar 2024 09:07:53 +0000 (+0100) Subject: refactor: Simplified shutdown - channel-tasks were joined multiple times X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e15fdb383ec1f34da5c08da17df8d0ee83b3369e;p=demos%2Fkafka%2Fchat refactor: Simplified shutdown - channel-tasks were joined multiple times * `KafkaServicesApplicationRunner` does not have to join the channel-tasks. * The channel-tasks are already joined by `ChannelTaskExecutor.join()` automatically, because the method is annotated with `@PreDestroy`. * Simplified the test-configuration accordingly. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java index 5e565281..d329ac6e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java @@ -16,23 +16,4 @@ public class ChannelTaskRunner infoChannelTaskExecutor.executeChannelTask(); dataChannelTaskExecutor.executeChannelTask(); } - - public void joinChannels() throws InterruptedException - { - joinChannel(dataChannelTaskExecutor); - joinChannel(infoChannelTaskExecutor); - } - - private void joinChannel( - ChannelTaskExecutor channelTaskExecutor) - throws InterruptedException - { - Channel channel = channelTaskExecutor.getChannel(); - while (channel.getChannelState() != ChannelState.SHUTTING_DOWN) - { - log.info("Waiting for {} to shut down...", channel); - Thread.sleep(1000); - } - channelTaskExecutor.join(); - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 9d8539f3..16b47414 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -1,9 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -20,8 +18,6 @@ import org.springframework.stereotype.Component; public class KafkaServicesApplicationRunner implements ApplicationRunner { private final ChannelTaskRunner channelTaskRunner; - private final Consumer dataChannelConsumer; - private final Consumer infoChannelConsumer; @Override @@ -30,14 +26,4 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner log.info("Executing channel-tasks"); channelTaskRunner.executeChannels(); } - - @PreDestroy - public void joinChannels() throws InterruptedException - { - log.info("Closing consumers"); - dataChannelConsumer.close(); - infoChannelConsumer.close(); - log.info("Joining channel-tasks"); - channelTaskRunner.joinChannels(); - } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index 0dfba9da..75097dcb 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.implementation.kafka.*; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; @@ -55,14 +54,11 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @AfterAll static void joinChannels( - @Autowired Consumer dataChannelConsumer, - @Autowired Consumer infoChannelConsumer, - @Autowired ChannelTaskRunner channelTaskRunner) - throws InterruptedException + @Autowired ChannelTaskExecutor dataChannelTaskExecutor, + @Autowired ChannelTaskExecutor infoChannelTaskExecutor) { - dataChannelConsumer.wakeup(); - infoChannelConsumer.wakeup(); - channelTaskRunner.joinChannels(); + dataChannelTaskExecutor.join(); + infoChannelTaskExecutor.join(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index a017cf35..72422c89 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; @@ -56,13 +55,10 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @AfterAll static void joinChannels( - @Autowired Consumer dataChannelConsumer, - @Autowired Consumer infoChannelConsumer, - @Autowired ChannelTaskRunner channelTaskRunner) - throws InterruptedException + @Autowired ChannelTaskExecutor dataChannelTaskExecutor, + @Autowired ChannelTaskExecutor infoChannelTaskExecutor) { - dataChannelConsumer.wakeup(); - infoChannelConsumer.wakeup(); - channelTaskRunner.joinChannels(); + dataChannelTaskExecutor.join(); + infoChannelTaskExecutor.join(); } }