1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
4 import jakarta.annotation.PreDestroy;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
11 import java.util.concurrent.CompletableFuture;
14 @RequiredArgsConstructor
16 public class ChannelTaskExecutor
18 private final ThreadPoolTaskExecutor taskExecutor;
20 private final Channel channel;
21 private final Consumer<String, AbstractMessageTo> consumer;
22 private final WorkAssignor workAssignor;
24 CompletableFuture<Void> channelTaskJob;
27 public void executeChannelTask()
29 workAssignor.assignWork(consumer);
30 log.info("Starting the consumer-task for {}", channel);
31 channelTaskJob = taskExecutor
32 .submitCompletable(channel)
35 log.error("The consumer-task for {} exited abnormally!", channel, e);
43 log.info("Signaling the consumer-task for {} to quit its work", channel);
45 log.info("Waiting for the consumer of {} to finish its work", channel);
46 channelTaskJob.join();
47 log.info("Joined the consumer-task for {}", channel);