WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:49:08 +0000 (15:49 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:49:08 +0000 (15:49 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

index 9e8576d..0cbc06b 100644 (file)
@@ -17,7 +17,7 @@ public class ChannelTaskRunner
     dataChannelTaskExecutor.executeConsumerTask();
   }
 
-  public void joinChannel() throws InterruptedException
+  public void joinChannels() throws InterruptedException
   {
     joinChannel(dataChannelTaskExecutor);
     joinChannel(infoChannelTaskExecutor);
index 2cb3971..97ecea2 100644 (file)
@@ -28,6 +28,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @PreDestroy
   public void joinConsumerTasks() throws InterruptedException
   {
-    channelTaskRunner.joinChannel();
+    channelTaskRunner.joinChannels();
   }
 }
index e54fac4..86cfa30 100644 (file)
@@ -57,7 +57,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
       @Autowired ChannelTaskRunner channelTaskRunner)
       throws InterruptedException
   {
-    KafkaTestUtils.joinConsumerTasks(channelTaskRunner);
+    channelTaskRunner.joinChannels();
   }
 
 
index 2ec2826..ab1cce9 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,9 +56,13 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
   @AfterAll
   static void joinConsumerTasks(
+      @Autowired Consumer dataChannelConsumer,
+      @Autowired Consumer infoChannelConsumer,
       @Autowired ChannelTaskRunner channelTaskRunner)
       throws InterruptedException
   {
-    KafkaTestUtils.joinConsumerTasks(channelTaskRunner);
+    dataChannelConsumer.wakeup();
+    infoChannelConsumer.wakeup();
+    channelTaskRunner.joinChannels();
   }
 }
index ca30f66..1da6158 100644 (file)
@@ -71,9 +71,4 @@ public abstract class KafkaTestUtils
         value,
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
-
-  public static void joinConsumerTasks(ChannelTaskRunner channelTaskRunner) throws InterruptedException
-  {
-    channelTaskRunner.joinChannel();
-  }
 }