refactor: Refined channel-states, introduced `ChannelState` -- MOVE
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ChannelTaskExecutor.java
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java
new file mode 100644 (file)
index 0000000..9425bdf
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskExecutor
+{
+  private final ThreadPoolTaskExecutor taskExecutor;
+  private final Runnable consumerTask;
+  private final Consumer<String, AbstractMessageTo> consumer;
+  private final WorkAssignor workAssignor;
+
+  CompletableFuture<Void> consumerTaskJob;
+
+
+  public void executeConsumerTask()
+  {
+    workAssignor.assignWork(consumer);
+    log.info("Starting the consumer-task for {}", consumerTask);
+    consumerTaskJob = taskExecutor
+        .submitCompletable(consumerTask)
+        .exceptionally(e ->
+        {
+          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+          return null;
+        });
+  }
+
+  @PreDestroy
+  public void joinConsumerTaskJob()
+  {
+    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    consumer.wakeup();
+    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+    consumerTaskJob.join();
+    log.info("Joined the consumer-task for {}", consumerTask);
+  }
+}