WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 22:45:00 +0000 (00:45 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 22:45:00 +0000 (00:45 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java

index afc5c2f..b505ffb 100644 (file)
@@ -26,7 +26,7 @@ public class ConsumerTaskRunner
     dataChannelConsumerTaskExecutor.executeConsumerTask();
   }
 
-  void joinTasks()
+  void joinConsumerTasks()
   {
     dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
     infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
index 936f81f..d0151ca 100644 (file)
@@ -30,6 +30,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @PreDestroy
   public void joinConsumerTasks()
   {
-    consumerTaskRunner.joinTasks();
+    consumerTaskRunner.joinConsumerTasks();
   }
 }
index 7e54ee3..577355e 100644 (file)
@@ -18,12 +18,9 @@ import org.springframework.context.annotation.Import;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import java.time.Clock;
-import java.time.Duration;
 import java.util.List;
-import java.util.UUID;
 
 import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
 import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
@@ -62,23 +59,13 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   static class KafkaChatHomeTestConfiguration
   {
     @Bean
-    ConsumerTaskExecutor.WorkAssignor workAssignor(
-        InfoChannel infoChannel,
-        DataChannel dataChannel)
+    ConsumerTaskExecutor.WorkAssignor workAssignor()
     {
       return consumer ->
       {
-        // TODO: Darauf warten, dass der Chat-Room erzeugt wurde!
-        Awaitility
-            .await()
-            .atMost(Duration.ofSeconds(10))
-            .until(() -> infoChannel
-                .getChatRoomInfo(UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"))
-                .block() != null);
         List<TopicPartition> assignedPartitions =
             List.of(new TopicPartition(DATA_TOPIC, 2));
         consumer.assign(assignedPartitions);
-        dataChannel.onPartitionsAssigned(assignedPartitions);
       };
     }
 
@@ -92,8 +79,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
   @BeforeAll
   public static void sendAndLoadStoredData(
-      @Autowired ConsumerTaskExecutor consumerTaskExecutor,
-      @Autowired KafkaTemplate<String, String> messageTemplate)
+      @Autowired ConsumerTaskRunner consumerTaskRunner,
+      @Autowired KafkaTemplate<String, String> messageTemplate) throws InterruptedException
   {
     send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
@@ -101,7 +88,7 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
-    consumerTaskExecutor.executeConsumerTask();
+    consumerTaskRunner.run();
   }
 
   static void send(
@@ -122,8 +109,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   }
 
   @AfterAll
-  static void joinConsumerJob(@Autowired ConsumerTaskExecutor consumerTaskExecutor)
+  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
   {
-    consumerTaskExecutor.joinConsumerTaskJob();
+    consumerTaskRunner.joinConsumerTasks();
   }
 }