X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FConsumerTaskExecutor.java;h=9425bdf46315c12f2cccad39839e032e9c9424a8;hb=efb070ce6e1e7ea1bb3297147b4e5a4bee3967cd;hp=b635dfc1b63c93a51459f4652fbe8669a93aebd0;hpb=fd4ec34a1ec3ebc1c6a1e34abb2124c3b97f224d;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java index b635dfc1..9425bdf4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java @@ -5,58 +5,43 @@ import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import java.util.List; import java.util.concurrent.CompletableFuture; -@ConditionalOnProperty( - prefix = "chat.backend", - name = "services", - havingValue = "kafka") @RequiredArgsConstructor @Slf4j -public class KafkaServicesApplicationRunner implements ApplicationRunner +public class ConsumerTaskExecutor { private final ThreadPoolTaskExecutor taskExecutor; - private final ChatRoomChannel chatRoomChannel; - private final Consumer chatRoomChannelConsumer; + private final Runnable consumerTask; + private final Consumer consumer; private final WorkAssignor workAssignor; - CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture consumerTaskJob; - @Override - public void run(ApplicationArguments args) throws Exception + public void executeConsumerTask() { - workAssignor.assignWork(chatRoomChannelConsumer); - log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor - .submitCompletable(chatRoomChannel) + workAssignor.assignWork(consumer); + log.info("Starting the consumer-task for {}", consumerTask); + consumerTaskJob = taskExecutor + .submitCompletable(consumerTask) .exceptionally(e -> { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + log.error("The consumer-task for {} exited abnormally!", consumerTask, e); return null; }); } @PreDestroy - public void joinChatRoomChannelConsumerJob() + public void joinConsumerTaskJob() { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - chatRoomChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } - - - interface WorkAssignor - { - void assignWork(Consumer consumer); + log.info("Signaling the consumer-task for {} to quit its work", consumerTask); + consumer.wakeup(); + log.info("Waiting for the consumer of {} to finish its work", consumerTask); + consumerTaskJob.join(); + log.info("Joined the consumer-task for {}", consumerTask); } }