From 21f92bb7319fee9906ebf9fbf8535ef6ad1b955b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 14 Sep 2023 23:41:46 +0200 Subject: [PATCH] refactor: Introduced `ConsumerTaskExecutor` -- Copied class --- .../kafka/ConsumerTaskExecutor.java | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java new file mode 100644 index 00000000..b635dfc1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java @@ -0,0 +1,62 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@RequiredArgsConstructor +@Slf4j +public class KafkaServicesApplicationRunner implements ApplicationRunner +{ + private final ThreadPoolTaskExecutor taskExecutor; + private final ChatRoomChannel chatRoomChannel; + private final Consumer chatRoomChannelConsumer; + private final WorkAssignor workAssignor; + + CompletableFuture chatRoomChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + workAssignor.assignWork(chatRoomChannelConsumer); + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + + interface WorkAssignor + { + void assignWork(Consumer consumer); + } +} -- 2.20.1