From: Kai Moritz Date: Thu, 14 Sep 2023 22:40:45 +0000 (+0200) Subject: WIP:ALIGN X-Git-Tag: rebase--2023-09-15--10-15~9 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=95eb493f6e7abcd515dbaba8c745dc7d6e0cc37b;p=demos%2Fkafka%2Fchat WIP:ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java new file mode 100644 index 00000000..afc5c2f0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -0,0 +1,34 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@RequiredArgsConstructor +@Slf4j +public class ConsumerTaskRunner +{ + private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; + private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + + private final InfoChannel infoChannel; + + void run() throws InterruptedException + { + infoChannelConsumerTaskExecutor.executeConsumerTask(); + + while (infoChannel.loadInProgress()) + { + log.info("InfoChannel is still loading..."); + Thread.sleep(1000); + } + + dataChannelConsumerTaskExecutor.executeConsumerTask(); + } + + void joinTasks() + { + dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 6810d06a..936f81fd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -1,9 +1,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; +import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -19,72 +18,18 @@ import org.springframework.stereotype.Component; @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { - private final String infoTopic; - private final ThreadPoolTaskExecutor taskExecutor; - private final InfoChannel infoChannel; - private final DataChannel dataChannel; - private final Consumer infoChannelConsumer; - private final Consumer dataChannelConsumer; - private final WorkAssignor workAssignor; - - CompletableFuture infoChannelConsumerJob; - CompletableFuture dataChannelConsumerJob; ->>>>>>> 7fb62d3 (WIP:ALIGN) + private final ConsumerTaskRunner consumerTaskRunner; @Override public void run(ApplicationArguments args) throws Exception { -<<<<<<< HEAD - chatRoomChannelTaskExecutor.executeConsumerTask(); -======= - List partitions = infoChannelConsumer - .partitionsFor(infoTopic) - .stream() - .map(partitionInfo -> new TopicPartition( - infoTopic, - partitionInfo.partition())) - .toList(); - infoChannelConsumer.assign(partitions); - log.info("Starting the consumer for the InfoChannel"); - infoChannelConsumerJob = taskExecutor - .submitCompletable(infoChannel) - .exceptionally(e -> - { - log.error("The consumer for the InfoChannel exited abnormally!", e); - return null; - }); - - while (infoChannel.loadInProgress()) - { - log.info("InfoChannel is still loading..."); - Thread.sleep(1000); - } - - workAssignor.assignWork(dataChannelConsumer); - log.info("Starting the consumer for the DataChannel"); - dataChannelConsumerJob = taskExecutor - .submitCompletable(dataChannel) - .exceptionally(e -> - { - log.error("The consumer for the DataChannel exited abnormally!", e); - return null; - }); + consumerTaskRunner.run(); } @PreDestroy - public void joinChatRoomChannelConsumerJob() - { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - infoChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - dataChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } - - - interface WorkAssignor + public void joinConsumerTasks() { - void assignWork(Consumer consumer); + consumerTaskRunner.joinTasks(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 58a470d7..997b5f16 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -36,7 +36,19 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ConsumerTaskExecutor infoChannelTaskExecutor( + ConsumerTaskRunner consumerTaskRunner( + ConsumerTaskExecutor infoChannelConsumerTaskExecutor, + ConsumerTaskExecutor dataChannelConsumerTaskExecutor, + InfoChannel infoChannel) + { + return new ConsumerTaskRunner( + infoChannelConsumerTaskExecutor, + dataChannelConsumerTaskExecutor, + infoChannel); + } + + @Bean + ConsumerTaskExecutor infoChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, Consumer dataChannelConsumer, @@ -50,7 +62,7 @@ public class KafkaServicesConfiguration } @Bean - ConsumerTaskExecutor dataChannelTaskExecutor( + ConsumerTaskExecutor dataChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, Consumer dataChannelConsumer,