From: Kai Moritz Date: Thu, 14 Sep 2023 21:41:46 +0000 (+0200) Subject: refactor: Introduced `ConsumerTaskExecutor` -- Copied class X-Git-Tag: rebase--2024-02-20--15-07~53 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=428fc79739c8d9a74cf1d68d663b75e5efc17e83;p=demos%2Fkafka%2Fchat refactor: Introduced `ConsumerTaskExecutor` -- Copied class --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java new file mode 100644 index 00000000..b635dfc1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java @@ -0,0 +1,62 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@RequiredArgsConstructor +@Slf4j +public class KafkaServicesApplicationRunner implements ApplicationRunner +{ + private final ThreadPoolTaskExecutor taskExecutor; + private final ChatRoomChannel chatRoomChannel; + private final Consumer chatRoomChannelConsumer; + private final WorkAssignor workAssignor; + + CompletableFuture chatRoomChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + workAssignor.assignWork(chatRoomChannelConsumer); + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + + interface WorkAssignor + { + void assignWork(Consumer consumer); + } +}