9d8539f37fa40ad356fd8c0d29b0f1dc6281239f
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesApplicationRunner.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import jakarta.annotation.PreDestroy;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.springframework.boot.ApplicationArguments;
8 import org.springframework.boot.ApplicationRunner;
9 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10 import org.springframework.stereotype.Component;
11
12
13 @ConditionalOnProperty(
14     prefix = "chat.backend",
15     name = "services",
16     havingValue = "kafka")
17 @Component
18 @RequiredArgsConstructor
19 @Slf4j
20 public class KafkaServicesApplicationRunner implements ApplicationRunner
21 {
22   private final ChannelTaskRunner channelTaskRunner;
23   private final Consumer dataChannelConsumer;
24   private final Consumer infoChannelConsumer;
25
26
27   @Override
28   public void run(ApplicationArguments args)
29   {
30     log.info("Executing channel-tasks");
31     channelTaskRunner.executeChannels();
32   }
33
34   @PreDestroy
35   public void joinChannels() throws InterruptedException
36   {
37     log.info("Closing consumers");
38     dataChannelConsumer.close();
39     infoChannelConsumer.close();
40     log.info("Joining channel-tasks");
41     channelTaskRunner.joinChannels();
42   }
43 }