From: Kai Moritz Date: Mon, 4 Mar 2024 14:10:28 +0000 (+0100) Subject: WIP:refactor: Refined channel-states, introduced `ChannelState` -- MOVE X-Git-Tag: rebase--2024-03-05--09-07~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=13e01af48efb7dbc31dc6031ed558f68f8e2de2f;p=demos%2Fkafka%2Fchat WIP:refactor: Refined channel-states, introduced `ChannelState` -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java deleted file mode 100644 index 087d94d3..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java +++ /dev/null @@ -1,49 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.util.concurrent.CompletableFuture; - - -@RequiredArgsConstructor -@Slf4j -public class ChannelExecutor -{ - private final ThreadPoolTaskExecutor taskExecutor; - @Getter - private final Channel channel; - private final Consumer consumer; - private final WorkAssignor workAssignor; - - CompletableFuture consumerTaskJob; - - - public void executeConsumerTask() - { - workAssignor.assignWork(consumer); - log.info("Starting the consumer-task for {}", channel); - consumerTaskJob = taskExecutor - .submitCompletable(channel) - .exceptionally(e -> - { - log.error("The consumer-task for {} exited abnormally!", channel, e); - return null; - }); - } - - @PreDestroy - public void joinConsumerTaskJob() - { - log.info("Signaling the consumer-task for {} to quit its work", channel); - consumer.wakeup(); - log.info("Waiting for the consumer of {} to finish its work", channel); - consumerTaskJob.join(); - log.info("Joined the consumer-task for {}", channel); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java deleted file mode 100644 index 92db79bf..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java +++ /dev/null @@ -1,32 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - - -@RequiredArgsConstructor -@Slf4j -public class ChannelRunner -{ - private final ChannelExecutor infoChannelExecutor; - private final ChannelExecutor dataChannelExecutor; - - public void executeChannel() - { - infoChannelExecutor.executeConsumerTask(); - dataChannelExecutor.executeConsumerTask(); - } - - public void joinChannel() throws InterruptedException - { - dataChannelExecutor.joinConsumerTaskJob(); - while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN) - { - log.info("Waiting for {} to shut down...", infoChannel); - Thread.sleep(1000); - } - infoChannelExecutor.joinConsumerTaskJob(); - } - - private void joinChannel() -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java new file mode 100644 index 00000000..087d94d3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java @@ -0,0 +1,49 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.CompletableFuture; + + +@RequiredArgsConstructor +@Slf4j +public class ChannelExecutor +{ + private final ThreadPoolTaskExecutor taskExecutor; + @Getter + private final Channel channel; + private final Consumer consumer; + private final WorkAssignor workAssignor; + + CompletableFuture consumerTaskJob; + + + public void executeConsumerTask() + { + workAssignor.assignWork(consumer); + log.info("Starting the consumer-task for {}", channel); + consumerTaskJob = taskExecutor + .submitCompletable(channel) + .exceptionally(e -> + { + log.error("The consumer-task for {} exited abnormally!", channel, e); + return null; + }); + } + + @PreDestroy + public void joinConsumerTaskJob() + { + log.info("Signaling the consumer-task for {} to quit its work", channel); + consumer.wakeup(); + log.info("Waiting for the consumer of {} to finish its work", channel); + consumerTaskJob.join(); + log.info("Joined the consumer-task for {}", channel); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java new file mode 100644 index 00000000..92db79bf --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java @@ -0,0 +1,32 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@RequiredArgsConstructor +@Slf4j +public class ChannelRunner +{ + private final ChannelExecutor infoChannelExecutor; + private final ChannelExecutor dataChannelExecutor; + + public void executeChannel() + { + infoChannelExecutor.executeConsumerTask(); + dataChannelExecutor.executeConsumerTask(); + } + + public void joinChannel() throws InterruptedException + { + dataChannelExecutor.joinConsumerTaskJob(); + while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN) + { + log.info("Waiting for {} to shut down...", infoChannel); + Thread.sleep(1000); + } + infoChannelExecutor.joinConsumerTaskJob(); + } + + private void joinChannel() +}