TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ChannelTaskExecutor.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
4 import lombok.Getter;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
9
10 import java.util.concurrent.CompletableFuture;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ChannelTaskExecutor
16 {
17   private final ThreadPoolTaskExecutor taskExecutor;
18   @Getter
19   private final Channel channel;
20   private final Consumer<String, AbstractMessageTo> consumer;
21   private final WorkAssignor workAssignor;
22
23   CompletableFuture<Void> channelTaskJob;
24
25
26   public void executeChannelTask()
27   {
28     workAssignor.assignWork(consumer);
29     log.info("Starting the consumer-task for {}", channel);
30     channelTaskJob = taskExecutor
31         .submitCompletable(channel)
32         .exceptionally(e ->
33         {
34           log.error("The consumer-task for {} exited abnormally!", channel, e);
35           return null;
36         });
37   }
38
39   public void join()
40   {
41     log.info("Signaling the consumer-task for {} to quit its work", channel);
42     consumer.wakeup();
43     log.info("Waiting for the consumer of {} to finish its work", channel);
44     channelTaskJob.join();
45     log.info("Joined the consumer-task for {}", channel);
46   }
47 }