1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import jakarta.annotation.PreDestroy;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.boot.ApplicationArguments;
8 import org.springframework.boot.ApplicationRunner;
9 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10 import org.springframework.context.ConfigurableApplicationContext;
11 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
12 import org.springframework.stereotype.Component;
14 import java.util.concurrent.CompletableFuture;
17 @ConditionalOnProperty(
18 prefix = "chat.backend",
20 havingValue = "kafka")
23 public class KafkaServicesApplicationRunner implements ApplicationRunner
26 ThreadPoolTaskExecutor taskExecutor;
28 ConfigurableApplicationContext context;
31 ChatRoomChannel chatRoomChannel;
33 Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer;
35 ChatMessageChannel chatMessageChannel;
37 Consumer<String, ChatMessageTo> chatMessageChannelConsumer;
39 CompletableFuture<Void> chatRoomChannelConsumerJob;
40 CompletableFuture<Void> chatMessageChannelConsumerJob;
44 public void run(ApplicationArguments args) throws Exception
46 log.info("Starting the consumer for the ChatRoomChannel");
47 chatRoomChannelConsumerJob = taskExecutor
48 .submitCompletable(chatRoomChannel)
51 log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
54 log.info("Starting the consumer for the ChatMessageChannel");
55 chatMessageChannelConsumerJob = taskExecutor
56 .submitCompletable(chatMessageChannel)
59 log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
65 public void joinChatRoomChannelConsumerJob()
67 log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
68 chatRoomChannelConsumer.wakeup();
69 log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
70 chatRoomChannelConsumerJob.join();
71 log.info("Joined the consumer of the ChatRoomChannel");
75 public void joinChatMessageChannelConsumerJob()
77 log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
78 chatMessageChannelConsumer.wakeup();
79 log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
80 chatMessageChannelConsumerJob.join();
81 log.info("Joined the consumer of the ChatMessageChannel");