From c624e5bb93bfd544ae728e4434c7200209b13180 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Sep 2023 21:30:36 +0200 Subject: [PATCH] WIP --- .../kafka/KafkaServicesConfiguration.java | 23 ++++++---- .../kafka/KafkaChatHomeServiceTest.java | 42 ++++++++++--------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 373fcb43..84a14407 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -38,20 +38,29 @@ public class KafkaServicesConfiguration { @Bean KafkaServicesApplicationRunner kafkaServicesApplicationRunner( - ChatBackendProperties properties, ThreadPoolTaskExecutor taskExecutor, ChatRoomChannel chatRoomChannel, - Consumer chatRoomChannelConsumer) + Consumer chatRoomChannelConsumer, + KafkaServicesApplicationRunner.WorkAssignor workAssignor) { return new KafkaServicesApplicationRunner( taskExecutor, chatRoomChannel, chatRoomChannelConsumer, - consumer -> - { - List topics = List.of(properties.getKafka().getChatRoomChannelTopic()); - consumer.subscribe(topics, chatRoomChannel); - }); + workAssignor); + } + + @Bean + KafkaServicesApplicationRunner.WorkAssignor workAssignor( + ChatBackendProperties properties, + ChatRoomChannel chatRoomChannel) + { + return consumer -> + { + List topics = + List.of(properties.getKafka().getChatRoomChannelTopic()); + consumer.subscribe(topics, chatRoomChannel); + }; } @Bean diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 20a09369..c2f12cb7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -9,6 +9,7 @@ import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.DefaultApplicationArguments; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -22,7 +23,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.time.Clock; import java.util.List; -import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS; import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.TOPIC; @@ -54,6 +54,17 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @EnableConfigurationProperties(ChatBackendProperties.class) static class KafkaChatHomeTestConfiguration { + @Bean + KafkaServicesApplicationRunner.WorkAssignor workAssignor() + { + return consumer -> + { + List assignedPartitions = + List.of(new TopicPartition(TOPIC, 2)); + consumer.assign(assignedPartitions); + }; + } + @Bean Clock clock() { @@ -64,10 +75,11 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll public static void sendAndLoadStoredData( - @Autowired KafkaTemplate messageTemplate, - @Autowired Consumer chatRoomChannelConsumer, - @Autowired ThreadPoolTaskExecutor taskExecutor, - @Autowired ChatRoomChannel chatRoomChannel) + @Autowired + KafkaTemplate messageTemplate, + @Autowired + KafkaServicesApplicationRunner applicationRunner) + throws Exception { send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); @@ -76,15 +88,7 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); - chatRoomChannelConsumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); - CONSUMER_JOB = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); + applicationRunner.run(new DefaultApplicationArguments()); } static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) @@ -100,12 +104,10 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest } @AfterAll - static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer) + static void joinConsumerJob( + @Autowired + KafkaServicesApplicationRunner applicationRunner) { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - CONSUMER_JOB.join(); - log.info("Joined the consumer of the ChatRoomChannel"); + applicationRunner.joinChatRoomChannelConsumerJob(); } } -- 2.20.1