X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FChannelTaskExecutor.java;h=45206779f9f7371590dc0d6937b965bc9b9a7073;hb=8300dcd98f681893a077051560151a8f1b94e38d;hp=9425bdf46315c12f2cccad39839e032e9c9424a8;hpb=eaec0e92a1887c6b1c0059de1b5db44039dc1dd4;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java index 9425bdf4..45206779 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java @@ -1,7 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import jakarta.annotation.PreDestroy; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -12,36 +12,36 @@ import java.util.concurrent.CompletableFuture; @RequiredArgsConstructor @Slf4j -public class ConsumerTaskExecutor +public class ChannelTaskExecutor { private final ThreadPoolTaskExecutor taskExecutor; - private final Runnable consumerTask; + @Getter + private final Channel channel; private final Consumer consumer; private final WorkAssignor workAssignor; - CompletableFuture consumerTaskJob; + CompletableFuture channelTaskJob; - public void executeConsumerTask() + public void executeChannelTask() { workAssignor.assignWork(consumer); - log.info("Starting the consumer-task for {}", consumerTask); - consumerTaskJob = taskExecutor - .submitCompletable(consumerTask) + log.info("Starting the consumer-task for {}", channel); + channelTaskJob = taskExecutor + .submitCompletable(channel) .exceptionally(e -> { - log.error("The consumer-task for {} exited abnormally!", consumerTask, e); + log.error("The consumer-task for {} exited abnormally!", channel, e); return null; }); } - @PreDestroy - public void joinConsumerTaskJob() + public void join() { - log.info("Signaling the consumer-task for {} to quit its work", consumerTask); + log.info("Signaling the consumer-task for {} to quit its work", channel); consumer.wakeup(); - log.info("Waiting for the consumer of {} to finish its work", consumerTask); - consumerTaskJob.join(); - log.info("Joined the consumer-task for {}", consumerTask); + log.info("Waiting for the consumer of {} to finish its work", channel); + channelTaskJob.join(); + log.info("Joined the consumer-task for {}", channel); } }