From 6658476e332736a2392170896cd242b1b4a7abe4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Sep 2023 22:53:23 +0200 Subject: [PATCH] WIP --- .../kafka/KafkaServicesApplicationRunner.java | 11 +++++----- .../kafka/KafkaServicesConfiguration.java | 14 ++++++++---- .../kafka/KafkaChatHomeServiceTest.java | 22 +++---------------- 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 125227a0..d9754a23 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -1,18 +1,15 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -26,9 +23,12 @@ import java.util.concurrent.CompletableFuture; @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { + private final String infoTopic; private final ThreadPoolTaskExecutor taskExecutor; + private final InfoChannel infoChannel; private final DataChannel dataChannel; - private final Consumer chatRoomChannelConsumer; + private final Consumer infoChannelConsumer; + private final Consumer dataChannelConsumer; private final WorkAssignor workAssignor; CompletableFuture infoChannelConsumerJob; @@ -38,7 +38,6 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Override public void run(ApplicationArguments args) throws Exception { - String infoTopic = properties.getKafka().getInfoChannelTopic(); List partitions = infoChannelConsumer .partitionsFor(infoTopic) .stream() @@ -62,7 +61,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner Thread.sleep(1000); } - workAssignor.assignWork(chatRoomChannelConsumer); + workAssignor.assignWork(dataChannelConsumer); log.info("Starting the consumer for the DataChannel"); dataChannelConsumerJob = taskExecutor .submitCompletable(dataChannel) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 18611d41..75d011f0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -38,15 +38,21 @@ public class KafkaServicesConfiguration { @Bean KafkaServicesApplicationRunner kafkaServicesApplicationRunner( + ChatBackendProperties properties, ThreadPoolTaskExecutor taskExecutor, - ChatRoomChannel chatRoomChannel, - Consumer chatRoomChannelConsumer, + InfoChannel infoChannel, + DataChannel dataChannel, + Consumer infoChannelConsumer, + Consumer dataChannelConsumer, KafkaServicesApplicationRunner.WorkAssignor workAssignor) { return new KafkaServicesApplicationRunner( + properties.getKafka().getInfoChannelTopic(), taskExecutor, - chatRoomChannel, - chatRoomChannelConsumer, + infoChannel, + dataChannel, + infoChannelConsumer, + dataChannelConsumer, workAssignor); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index a4fc0630..6bf3a742 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -8,7 +8,6 @@ import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.DefaultApplicationArguments; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -52,12 +51,6 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO"; final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA"; -<<<<<<< HEAD -======= - static CompletableFuture INFO_CHANNEL_CONSUMER_JOB; - static CompletableFuture DATA_CHANNEL_CONSUMER_JOB; - ->>>>>>> 5c8db7f (WIP) @TestConfiguration @EnableConfigurationProperties(ChatBackendProperties.class) @@ -65,15 +58,14 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest static class KafkaChatHomeTestConfiguration { @Bean - KafkaServicesApplicationRunner.WorkAssignor workAssignor( - ChatRoomChannel chatRoomChannel) + KafkaServicesApplicationRunner.WorkAssignor workAssignor(DataChannel dataChannel) { return consumer -> { List assignedPartitions = - List.of(new TopicPartition(TOPIC, 2)); + List.of(new TopicPartition(DATA_TOPIC, 2)); consumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); + dataChannel.onPartitionsAssigned(assignedPartitions); }; } @@ -110,14 +102,6 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @AfterAll static void joinConsumerJob(@Autowired KafkaServicesApplicationRunner applicationRunner) { -<<<<<<< HEAD applicationRunner.joinChatRoomChannelConsumerJob(); -======= - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - DATA_CHANNEL_CONSUMER_JOB.join(); - log.info("Joined the consumer of the ChatRoomChannel"); ->>>>>>> 5c8db7f (WIP) } } -- 2.20.1