X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FChannelTaskExecutor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FChannelTaskExecutor.java;h=9425bdf46315c12f2cccad39839e032e9c9424a8;hb=eaec0e92a1887c6b1c0059de1b5db44039dc1dd4;hp=0000000000000000000000000000000000000000;hpb=b3604ee493ab20909fa63486a9abbc2c252abac8;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java new file mode 100644 index 00000000..9425bdf4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java @@ -0,0 +1,47 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.CompletableFuture; + + +@RequiredArgsConstructor +@Slf4j +public class ConsumerTaskExecutor +{ + private final ThreadPoolTaskExecutor taskExecutor; + private final Runnable consumerTask; + private final Consumer consumer; + private final WorkAssignor workAssignor; + + CompletableFuture consumerTaskJob; + + + public void executeConsumerTask() + { + workAssignor.assignWork(consumer); + log.info("Starting the consumer-task for {}", consumerTask); + consumerTaskJob = taskExecutor + .submitCompletable(consumerTask) + .exceptionally(e -> + { + log.error("The consumer-task for {} exited abnormally!", consumerTask, e); + return null; + }); + } + + @PreDestroy + public void joinConsumerTaskJob() + { + log.info("Signaling the consumer-task for {} to quit its work", consumerTask); + consumer.wakeup(); + log.info("Waiting for the consumer of {} to finish its work", consumerTask); + consumerTaskJob.join(); + log.info("Joined the consumer-task for {}", consumerTask); + } +}