From 1d6467721f7eb38fef2de1c355e25643c5156eaf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Sep 2023 00:45:00 +0200 Subject: [PATCH] WIP:ALIGN --- .../kafka/ConsumerTaskRunner.java | 2 +- .../kafka/KafkaServicesApplicationRunner.java | 2 +- .../kafka/KafkaChatHomeServiceTest.java | 25 +++++-------------- 3 files changed, 8 insertions(+), 21 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java index afc5c2f0..b505ffb7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -26,7 +26,7 @@ public class ConsumerTaskRunner dataChannelConsumerTaskExecutor.executeConsumerTask(); } - void joinTasks() + void joinConsumerTasks() { dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 936f81fd..d0151cac 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -30,6 +30,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinConsumerTasks() { - consumerTaskRunner.joinTasks(); + consumerTaskRunner.joinConsumerTasks(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 7e54ee32..577355e5 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -18,12 +18,9 @@ import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; -import org.testcontainers.shaded.org.awaitility.Awaitility; import java.time.Clock; -import java.time.Duration; import java.util.List; -import java.util.UUID; import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS; import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC; @@ -62,23 +59,13 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest static class KafkaChatHomeTestConfiguration { @Bean - ConsumerTaskExecutor.WorkAssignor workAssignor( - InfoChannel infoChannel, - DataChannel dataChannel) + ConsumerTaskExecutor.WorkAssignor workAssignor() { return consumer -> { - // TODO: Darauf warten, dass der Chat-Room erzeugt wurde! - Awaitility - .await() - .atMost(Duration.ofSeconds(10)) - .until(() -> infoChannel - .getChatRoomInfo(UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7")) - .block() != null); List assignedPartitions = List.of(new TopicPartition(DATA_TOPIC, 2)); consumer.assign(assignedPartitions); - dataChannel.onPartitionsAssigned(assignedPartitions); }; } @@ -92,8 +79,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll public static void sendAndLoadStoredData( - @Autowired ConsumerTaskExecutor consumerTaskExecutor, - @Autowired KafkaTemplate messageTemplate) + @Autowired ConsumerTaskRunner consumerTaskRunner, + @Autowired KafkaTemplate messageTemplate) throws InterruptedException { send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); @@ -101,7 +88,7 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - consumerTaskExecutor.executeConsumerTask(); + consumerTaskRunner.run(); } static void send( @@ -122,8 +109,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest } @AfterAll - static void joinConsumerJob(@Autowired ConsumerTaskExecutor consumerTaskExecutor) + static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) { - consumerTaskExecutor.joinConsumerTaskJob(); + consumerTaskRunner.joinConsumerTasks(); } } -- 2.20.1