636c03bcdcc627b8e77b373f5aab3d9d816a4869
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ChannelTaskExecutor.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
4 import jakarta.annotation.PreDestroy;
5 import lombok.Getter;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
10
11 import java.util.concurrent.CompletableFuture;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class ChannelTaskExecutor
17 {
18   private final ThreadPoolTaskExecutor taskExecutor;
19   @Getter
20   private final Channel channel;
21   private final Consumer<String, AbstractMessageTo> consumer;
22   private final WorkAssignor workAssignor;
23
24   CompletableFuture<Void> channelTaskJob;
25
26
27   public void executeChannelTask()
28   {
29     workAssignor.assignWork(consumer);
30     log.info("Starting the consumer-task for {}", channel);
31     channelTaskJob = taskExecutor
32         .submitCompletable(channel)
33         .exceptionally(e ->
34         {
35           log.error("The consumer-task for {} exited abnormally!", channel, e);
36           return null;
37         });
38   }
39
40   @PreDestroy
41   public void join()
42   {
43     log.info("Signaling the consumer-task for {} to quit its work", channel);
44     consumer.wakeup();
45     log.info("Waiting for the consumer of {} to finish its work", channel);
46     channelTaskJob.join();
47     log.info("Joined the consumer-task for {}", channel);
48   }
49 }