fix: The actual position has to be requested from the consumer
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ConsumerTaskExecutor.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
4 import jakarta.annotation.PreDestroy;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
9
10 import java.util.concurrent.CompletableFuture;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ConsumerTaskExecutor
16 {
17   private final ThreadPoolTaskExecutor taskExecutor;
18   private final Runnable consumerTask;
19   private final Consumer<String, AbstractMessageTo> consumer;
20   private final WorkAssignor workAssignor;
21
22   CompletableFuture<Void> consumerTaskJob;
23
24
25   public void executeConsumerTask()
26   {
27     workAssignor.assignWork(consumer);
28     log.info("Starting the consumer-task for {}", consumerTask);
29     consumerTaskJob = taskExecutor
30         .submitCompletable(consumerTask)
31         .exceptionally(e ->
32         {
33           log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
34           return null;
35         });
36   }
37
38   @PreDestroy
39   public void joinConsumerTaskJob()
40   {
41     log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
42     consumer.wakeup();
43     log.info("Waiting for the consumer of {} to finish its work", consumerTask);
44     consumerTaskJob.join();
45     log.info("Joined the consumer-task for {}", consumerTask);
46   }
47 }