From: Kai Moritz Date: Thu, 14 Sep 2023 22:45:00 +0000 (+0200) Subject: WIP:ALIGN X-Git-Tag: rebase--2023-09-15--10-15~8 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1d6467721f7eb38fef2de1c355e25643c5156eaf;p=demos%2Fkafka%2Fchat WIP:ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java index afc5c2f0..b505ffb7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -26,7 +26,7 @@ public class ConsumerTaskRunner dataChannelConsumerTaskExecutor.executeConsumerTask(); } - void joinTasks() + void joinConsumerTasks() { dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 936f81fd..d0151cac 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -30,6 +30,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @PreDestroy public void joinConsumerTasks() { - consumerTaskRunner.joinTasks(); + consumerTaskRunner.joinConsumerTasks(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 7e54ee32..577355e5 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -18,12 +18,9 @@ import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; -import org.testcontainers.shaded.org.awaitility.Awaitility; import java.time.Clock; -import java.time.Duration; import java.util.List; -import java.util.UUID; import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS; import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC; @@ -62,23 +59,13 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest static class KafkaChatHomeTestConfiguration { @Bean - ConsumerTaskExecutor.WorkAssignor workAssignor( - InfoChannel infoChannel, - DataChannel dataChannel) + ConsumerTaskExecutor.WorkAssignor workAssignor() { return consumer -> { - // TODO: Darauf warten, dass der Chat-Room erzeugt wurde! - Awaitility - .await() - .atMost(Duration.ofSeconds(10)) - .until(() -> infoChannel - .getChatRoomInfo(UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7")) - .block() != null); List assignedPartitions = List.of(new TopicPartition(DATA_TOPIC, 2)); consumer.assign(assignedPartitions); - dataChannel.onPartitionsAssigned(assignedPartitions); }; } @@ -92,8 +79,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll public static void sendAndLoadStoredData( - @Autowired ConsumerTaskExecutor consumerTaskExecutor, - @Autowired KafkaTemplate messageTemplate) + @Autowired ConsumerTaskRunner consumerTaskRunner, + @Autowired KafkaTemplate messageTemplate) throws InterruptedException { send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); @@ -101,7 +88,7 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - consumerTaskExecutor.executeConsumerTask(); + consumerTaskRunner.run(); } static void send( @@ -122,8 +109,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest } @AfterAll - static void joinConsumerJob(@Autowired ConsumerTaskExecutor consumerTaskExecutor) + static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) { - consumerTaskExecutor.joinConsumerTaskJob(); + consumerTaskRunner.joinConsumerTasks(); } }