From 95eb493f6e7abcd515dbaba8c745dc7d6e0cc37b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Sep 2023 00:40:45 +0200 Subject: [PATCH] WIP:ALIGN --- .../kafka/ConsumerTaskRunner.java | 34 ++++++++++ .../kafka/KafkaServicesApplicationRunner.java | 65 ++----------------- .../kafka/KafkaServicesConfiguration.java | 16 ++++- 3 files changed, 53 insertions(+), 62 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java new file mode 100644 index 00000000..afc5c2f0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -0,0 +1,34 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@RequiredArgsConstructor +@Slf4j +public class ConsumerTaskRunner +{ + private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; + private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + + private final InfoChannel infoChannel; + + void run() throws InterruptedException + { + infoChannelConsumerTaskExecutor.executeConsumerTask(); + + while (infoChannel.loadInProgress()) + { + log.info("InfoChannel is still loading..."); + Thread.sleep(1000); + } + + dataChannelConsumerTaskExecutor.executeConsumerTask(); + } + + void joinTasks() + { + dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 6810d06a..936f81fd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -1,9 +1,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; +import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -19,72 +18,18 @@ import org.springframework.stereotype.Component; @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { - private final String infoTopic; - private final ThreadPoolTaskExecutor taskExecutor; - private final InfoChannel infoChannel; - private final DataChannel dataChannel; - private final Consumer infoChannelConsumer; - private final Consumer dataChannelConsumer; - private final WorkAssignor workAssignor; - - CompletableFuture infoChannelConsumerJob; - CompletableFuture dataChannelConsumerJob; ->>>>>>> 7fb62d3 (WIP:ALIGN) + private final ConsumerTaskRunner consumerTaskRunner; @Override public void run(ApplicationArguments args) throws Exception { -<<<<<<< HEAD - chatRoomChannelTaskExecutor.executeConsumerTask(); -======= - List partitions = infoChannelConsumer - .partitionsFor(infoTopic) - .stream() - .map(partitionInfo -> new TopicPartition( - infoTopic, - partitionInfo.partition())) - .toList(); - infoChannelConsumer.assign(partitions); - log.info("Starting the consumer for the InfoChannel"); - infoChannelConsumerJob = taskExecutor - .submitCompletable(infoChannel) - .exceptionally(e -> - { - log.error("The consumer for the InfoChannel exited abnormally!", e); - return null; - }); - - while (infoChannel.loadInProgress()) - { - log.info("InfoChannel is still loading..."); - Thread.sleep(1000); - } - - workAssignor.assignWork(dataChannelConsumer); - log.info("Starting the consumer for the DataChannel"); - dataChannelConsumerJob = taskExecutor - .submitCompletable(dataChannel) - .exceptionally(e -> - { - log.error("The consumer for the DataChannel exited abnormally!", e); - return null; - }); + consumerTaskRunner.run(); } @PreDestroy - public void joinChatRoomChannelConsumerJob() - { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - infoChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - dataChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } - - - interface WorkAssignor + public void joinConsumerTasks() { - void assignWork(Consumer consumer); + consumerTaskRunner.joinTasks(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 58a470d7..997b5f16 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -36,7 +36,19 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ConsumerTaskExecutor infoChannelTaskExecutor( + ConsumerTaskRunner consumerTaskRunner( + ConsumerTaskExecutor infoChannelConsumerTaskExecutor, + ConsumerTaskExecutor dataChannelConsumerTaskExecutor, + InfoChannel infoChannel) + { + return new ConsumerTaskRunner( + infoChannelConsumerTaskExecutor, + dataChannelConsumerTaskExecutor, + infoChannel); + } + + @Bean + ConsumerTaskExecutor infoChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, Consumer dataChannelConsumer, @@ -50,7 +62,7 @@ public class KafkaServicesConfiguration } @Bean - ConsumerTaskExecutor dataChannelTaskExecutor( + ConsumerTaskExecutor dataChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, Consumer dataChannelConsumer, -- 2.20.1