From: Kai Moritz Date: Mon, 4 Mar 2024 13:32:14 +0000 (+0100) Subject: refactor: Refined channel-states, introduced `ChannelState` -- MOVE X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=eaec0e92a1887c6b1c0059de1b5db44039dc1dd4;p=demos%2Fkafka%2Fchat refactor: Refined channel-states, introduced `ChannelState` -- MOVE * Renamed and moved `LoadInProgressException` ** Moved exception into implementation-specific package ** Renamed exception to `ChannelNotReadyException` * Renamed `ConsumerTaskExecutor` into `ChannelTaskExecutor` * Renamed `ConsumerTaskRunner` into `ChannelTaskRunner` --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/LoadInProgressException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/LoadInProgressException.java deleted file mode 100644 index 8a0a81f9..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/LoadInProgressException.java +++ /dev/null @@ -1,10 +0,0 @@ -package de.juplo.kafka.chat.backend.domain.exceptions; - - -public class LoadInProgressException extends IllegalStateException -{ - public LoadInProgressException() - { - super("Load in progress..."); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java new file mode 100644 index 00000000..8a0a81f9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java @@ -0,0 +1,10 @@ +package de.juplo.kafka.chat.backend.domain.exceptions; + + +public class LoadInProgressException extends IllegalStateException +{ + public LoadInProgressException() + { + super("Load in progress..."); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java new file mode 100644 index 00000000..9425bdf4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java @@ -0,0 +1,47 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.CompletableFuture; + + +@RequiredArgsConstructor +@Slf4j +public class ConsumerTaskExecutor +{ + private final ThreadPoolTaskExecutor taskExecutor; + private final Runnable consumerTask; + private final Consumer consumer; + private final WorkAssignor workAssignor; + + CompletableFuture consumerTaskJob; + + + public void executeConsumerTask() + { + workAssignor.assignWork(consumer); + log.info("Starting the consumer-task for {}", consumerTask); + consumerTaskJob = taskExecutor + .submitCompletable(consumerTask) + .exceptionally(e -> + { + log.error("The consumer-task for {} exited abnormally!", consumerTask, e); + return null; + }); + } + + @PreDestroy + public void joinConsumerTaskJob() + { + log.info("Signaling the consumer-task for {} to quit its work", consumerTask); + consumer.wakeup(); + log.info("Waiting for the consumer of {} to finish its work", consumerTask); + consumerTaskJob.join(); + log.info("Joined the consumer-task for {}", consumerTask); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java new file mode 100644 index 00000000..c2c28014 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java @@ -0,0 +1,31 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@RequiredArgsConstructor +@Slf4j +public class ConsumerTaskRunner +{ + private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; + private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + private final InfoChannel infoChannel; + + public void executeConsumerTasks() + { + infoChannelConsumerTaskExecutor.executeConsumerTask(); + dataChannelConsumerTaskExecutor.executeConsumerTask(); + } + + public void joinConsumerTasks() throws InterruptedException + { + dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + while (infoChannel.isLoadInProgress()) + { + log.info("Waiting for {} to finish loading...", infoChannel); + Thread.sleep(1000); + } + infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java deleted file mode 100644 index 9425bdf4..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java +++ /dev/null @@ -1,47 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import jakarta.annotation.PreDestroy; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.util.concurrent.CompletableFuture; - - -@RequiredArgsConstructor -@Slf4j -public class ConsumerTaskExecutor -{ - private final ThreadPoolTaskExecutor taskExecutor; - private final Runnable consumerTask; - private final Consumer consumer; - private final WorkAssignor workAssignor; - - CompletableFuture consumerTaskJob; - - - public void executeConsumerTask() - { - workAssignor.assignWork(consumer); - log.info("Starting the consumer-task for {}", consumerTask); - consumerTaskJob = taskExecutor - .submitCompletable(consumerTask) - .exceptionally(e -> - { - log.error("The consumer-task for {} exited abnormally!", consumerTask, e); - return null; - }); - } - - @PreDestroy - public void joinConsumerTaskJob() - { - log.info("Signaling the consumer-task for {} to quit its work", consumerTask); - consumer.wakeup(); - log.info("Waiting for the consumer of {} to finish its work", consumerTask); - consumerTaskJob.join(); - log.info("Joined the consumer-task for {}", consumerTask); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java deleted file mode 100644 index c2c28014..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ /dev/null @@ -1,31 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - - -@RequiredArgsConstructor -@Slf4j -public class ConsumerTaskRunner -{ - private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; - private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; - private final InfoChannel infoChannel; - - public void executeConsumerTasks() - { - infoChannelConsumerTaskExecutor.executeConsumerTask(); - dataChannelConsumerTaskExecutor.executeConsumerTask(); - } - - public void joinConsumerTasks() throws InterruptedException - { - dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); - while (infoChannel.isLoadInProgress()) - { - log.info("Waiting for {} to finish loading...", infoChannel); - Thread.sleep(1000); - } - infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); - } -}