refactor: `KafkaChatHomeServiceTest` reuses regular startup-logic
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesApplicationRunner.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
4 import jakarta.annotation.PreDestroy;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.springframework.boot.ApplicationArguments;
9 import org.springframework.boot.ApplicationRunner;
10 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
11 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
12
13 import java.util.List;
14 import java.util.concurrent.CompletableFuture;
15
16
17 @ConditionalOnProperty(
18     prefix = "chat.backend",
19     name = "services",
20     havingValue = "kafka")
21 @RequiredArgsConstructor
22 @Slf4j
23 public class KafkaServicesApplicationRunner implements ApplicationRunner
24 {
25   private final ThreadPoolTaskExecutor taskExecutor;
26   private final ChatRoomChannel chatRoomChannel;
27   private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
28   private final WorkAssignor workAssignor;
29
30   CompletableFuture<Void> chatRoomChannelConsumerJob;
31
32
33   @Override
34   public void run(ApplicationArguments args) throws Exception
35   {
36     workAssignor.assignWork(chatRoomChannelConsumer);
37     log.info("Starting the consumer for the ChatRoomChannel");
38     chatRoomChannelConsumerJob = taskExecutor
39         .submitCompletable(chatRoomChannel)
40         .exceptionally(e ->
41         {
42           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
43           return null;
44         });
45   }
46
47   @PreDestroy
48   public void joinChatRoomChannelConsumerJob()
49   {
50     log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
51     chatRoomChannelConsumer.wakeup();
52     log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
53     chatRoomChannelConsumerJob.join();
54     log.info("Joined the consumer of the ChatRoomChannel");
55   }
56
57
58   interface WorkAssignor
59   {
60     void assignWork(Consumer<?, ?> consumer);
61   }
62 }