]> juplo.de Git - demos/kafka/chat/commitdiff
WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:49:08 +0000 (15:49 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:49:08 +0000 (15:49 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

index 9e8576d0cb70e3d0041dfa967d8f0de063e6d634..0cbc06bb2ac56b45db2dbb182ba7c93f3db74716 100644 (file)
@@ -17,7 +17,7 @@ public class ChannelTaskRunner
     dataChannelTaskExecutor.executeConsumerTask();
   }
 
-  public void joinChannel() throws InterruptedException
+  public void joinChannels() throws InterruptedException
   {
     joinChannel(dataChannelTaskExecutor);
     joinChannel(infoChannelTaskExecutor);
index 2cb3971b16ec09e276f98f56b74cef39a56d60dd..97ecea2ffbc644af80b213a405a69025b7e87737 100644 (file)
@@ -28,6 +28,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @PreDestroy
   public void joinConsumerTasks() throws InterruptedException
   {
-    channelTaskRunner.joinChannel();
+    channelTaskRunner.joinChannels();
   }
 }
index e54fac4e67868354f3fb0dfd825bcd3b6def2c79..86cfa301ed1c759a1ee922dd718b07311b2221cc 100644 (file)
@@ -57,7 +57,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
       @Autowired ChannelTaskRunner channelTaskRunner)
       throws InterruptedException
   {
-    KafkaTestUtils.joinConsumerTasks(channelTaskRunner);
+    channelTaskRunner.joinChannels();
   }
 
 
index 2ec28265322763d25c16e8113a081749f2a0dc39..ab1cce9c4d6658e96076a8920775b80d3d5c921b 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,9 +56,13 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
   @AfterAll
   static void joinConsumerTasks(
+      @Autowired Consumer dataChannelConsumer,
+      @Autowired Consumer infoChannelConsumer,
       @Autowired ChannelTaskRunner channelTaskRunner)
       throws InterruptedException
   {
-    KafkaTestUtils.joinConsumerTasks(channelTaskRunner);
+    dataChannelConsumer.wakeup();
+    infoChannelConsumer.wakeup();
+    channelTaskRunner.joinChannels();
   }
 }
index ca30f667a8222520cd22bec03ccc4ceffb9ae4dc..1da6158bfe9bdc72f456c39dd9513c39a1df0e2f 100644 (file)
@@ -71,9 +71,4 @@ public abstract class KafkaTestUtils
         value,
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
-
-  public static void joinConsumerTasks(ChannelTaskRunner channelTaskRunner) throws InterruptedException
-  {
-    channelTaskRunner.joinChannel();
-  }
 }