From: Kai Moritz Date: Mon, 4 Mar 2024 14:05:14 +0000 (+0100) Subject: WIP:refactor: Refined channel-states, introduced `ChannelState` -- MOVE X-Git-Tag: rebase--2024-03-05--09-07~9 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=98f6a8e1034bb1827604e5dbb980b17ea256c69f;p=demos%2Fkafka%2Fchat WIP:refactor: Refined channel-states, introduced `ChannelState` -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java new file mode 100644 index 00000000..9425bdf4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java @@ -0,0 +1,47 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.CompletableFuture; + + +@RequiredArgsConstructor +@Slf4j +public class ConsumerTaskExecutor +{ + private final ThreadPoolTaskExecutor taskExecutor; + private final Runnable consumerTask; + private final Consumer consumer; + private final WorkAssignor workAssignor; + + CompletableFuture consumerTaskJob; + + + public void executeConsumerTask() + { + workAssignor.assignWork(consumer); + log.info("Starting the consumer-task for {}", consumerTask); + consumerTaskJob = taskExecutor + .submitCompletable(consumerTask) + .exceptionally(e -> + { + log.error("The consumer-task for {} exited abnormally!", consumerTask, e); + return null; + }); + } + + @PreDestroy + public void joinConsumerTaskJob() + { + log.info("Signaling the consumer-task for {} to quit its work", consumerTask); + consumer.wakeup(); + log.info("Waiting for the consumer of {} to finish its work", consumerTask); + consumerTaskJob.join(); + log.info("Joined the consumer-task for {}", consumerTask); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java new file mode 100644 index 00000000..0c6de1d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java @@ -0,0 +1,31 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@RequiredArgsConstructor +@Slf4j +public class ConsumerTaskRunner +{ + private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; + private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + private final InfoChannel infoChannel; + + public void executeConsumerTasks() + { + infoChannelConsumerTaskExecutor.executeConsumerTask(); + dataChannelConsumerTaskExecutor.executeConsumerTask(); + } + + public void joinConsumerTasks() throws InterruptedException + { + dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN) + { + log.info("Waiting for {} to shut down...", infoChannel); + Thread.sleep(1000); + } + infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java deleted file mode 100644 index 9425bdf4..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java +++ /dev/null @@ -1,47 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import jakarta.annotation.PreDestroy; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.util.concurrent.CompletableFuture; - - -@RequiredArgsConstructor -@Slf4j -public class ConsumerTaskExecutor -{ - private final ThreadPoolTaskExecutor taskExecutor; - private final Runnable consumerTask; - private final Consumer consumer; - private final WorkAssignor workAssignor; - - CompletableFuture consumerTaskJob; - - - public void executeConsumerTask() - { - workAssignor.assignWork(consumer); - log.info("Starting the consumer-task for {}", consumerTask); - consumerTaskJob = taskExecutor - .submitCompletable(consumerTask) - .exceptionally(e -> - { - log.error("The consumer-task for {} exited abnormally!", consumerTask, e); - return null; - }); - } - - @PreDestroy - public void joinConsumerTaskJob() - { - log.info("Signaling the consumer-task for {} to quit its work", consumerTask); - consumer.wakeup(); - log.info("Waiting for the consumer of {} to finish its work", consumerTask); - consumerTaskJob.join(); - log.info("Joined the consumer-task for {}", consumerTask); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java deleted file mode 100644 index 0c6de1d5..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ /dev/null @@ -1,31 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - - -@RequiredArgsConstructor -@Slf4j -public class ConsumerTaskRunner -{ - private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; - private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; - private final InfoChannel infoChannel; - - public void executeConsumerTasks() - { - infoChannelConsumerTaskExecutor.executeConsumerTask(); - dataChannelConsumerTaskExecutor.executeConsumerTask(); - } - - public void joinConsumerTasks() throws InterruptedException - { - dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); - while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN) - { - log.info("Waiting for {} to shut down...", infoChannel); - Thread.sleep(1000); - } - infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); - } -}