refactor: Simplified shutdown - channel-tasks were joined multiple times
authorKai Moritz <kai@juplo.de>
Wed, 6 Mar 2024 09:07:53 +0000 (10:07 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 14 Mar 2024 08:11:21 +0000 (09:11 +0100)
* `KafkaServicesApplicationRunner` does not have to join the channel-tasks.
* The channel-tasks are already joined by `ChannelTaskExecutor.join()`
  automatically, because the method is annotated with `@PreDestroy`.
* Simplified the test-configuration accordingly.

src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java

index 5e56528..d329ac6 100644 (file)
@@ -16,23 +16,4 @@ public class ChannelTaskRunner
     infoChannelTaskExecutor.executeChannelTask();
     dataChannelTaskExecutor.executeChannelTask();
   }
-
-  public void joinChannels() throws InterruptedException
-  {
-    joinChannel(dataChannelTaskExecutor);
-    joinChannel(infoChannelTaskExecutor);
-  }
-
-  private void joinChannel(
-      ChannelTaskExecutor channelTaskExecutor)
-      throws InterruptedException
-  {
-    Channel channel = channelTaskExecutor.getChannel();
-    while (channel.getChannelState() != ChannelState.SHUTTING_DOWN)
-    {
-      log.info("Waiting for {} to shut down...", channel);
-      Thread.sleep(1000);
-    }
-    channelTaskExecutor.join();
-  }
 }
index 9d8539f..16b4741 100644 (file)
@@ -1,9 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import jakarta.annotation.PreDestroy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -20,8 +18,6 @@ import org.springframework.stereotype.Component;
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
   private final ChannelTaskRunner channelTaskRunner;
-  private final Consumer dataChannelConsumer;
-  private final Consumer infoChannelConsumer;
 
 
   @Override
@@ -30,14 +26,4 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
     log.info("Executing channel-tasks");
     channelTaskRunner.executeChannels();
   }
-
-  @PreDestroy
-  public void joinChannels() throws InterruptedException
-  {
-    log.info("Closing consumers");
-    dataChannelConsumer.close();
-    infoChannelConsumer.close();
-    log.info("Joining channel-tasks");
-    channelTaskRunner.joinChannels();
-  }
 }
index 0dfba9d..75097dc 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.implementation.kafka.*;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,14 +54,11 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 
   @AfterAll
   static void joinChannels(
-      @Autowired Consumer dataChannelConsumer,
-      @Autowired Consumer infoChannelConsumer,
-      @Autowired ChannelTaskRunner channelTaskRunner)
-      throws InterruptedException
+      @Autowired ChannelTaskExecutor dataChannelTaskExecutor,
+      @Autowired ChannelTaskExecutor infoChannelTaskExecutor)
   {
-    dataChannelConsumer.wakeup();
-    infoChannelConsumer.wakeup();
-    channelTaskRunner.joinChannels();
+    dataChannelTaskExecutor.join();
+    infoChannelTaskExecutor.join();
   }
 
 
index a017cf3..72422c8 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -56,13 +55,10 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
   @AfterAll
   static void joinChannels(
-      @Autowired Consumer dataChannelConsumer,
-      @Autowired Consumer infoChannelConsumer,
-      @Autowired ChannelTaskRunner channelTaskRunner)
-      throws InterruptedException
+      @Autowired ChannelTaskExecutor dataChannelTaskExecutor,
+      @Autowired ChannelTaskExecutor infoChannelTaskExecutor)
   {
-    dataChannelConsumer.wakeup();
-    infoChannelConsumer.wakeup();
-    channelTaskRunner.joinChannels();
+    dataChannelTaskExecutor.join();
+    infoChannelTaskExecutor.join();
   }
 }