1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
4 import jakarta.annotation.PreDestroy;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
10 import java.util.concurrent.CompletableFuture;
13 @RequiredArgsConstructor
15 public class ConsumerTaskExecutor
17 private final ThreadPoolTaskExecutor taskExecutor;
18 private final Runnable consumerTask;
19 private final Consumer<String, AbstractMessageTo> consumer;
20 private final WorkAssignor workAssignor;
22 CompletableFuture<Void> consumerTaskJob;
25 public void executeConsumerTask()
27 workAssignor.assignWork(consumer);
28 log.info("Starting the consumer-task for {}", consumerTask);
29 consumerTaskJob = taskExecutor
30 .submitCompletable(consumerTask)
33 log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
39 public void joinConsumerTaskJob()
41 log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
43 log.info("Waiting for the consumer of {} to finish its work", consumerTask);
44 consumerTaskJob.join();
45 log.info("Joined the consumer-task for {}", consumerTask);