WIP:fix:activation
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ChannelTaskExecutor.java
index 9425bdf..4520677 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -12,36 +12,36 @@ import java.util.concurrent.CompletableFuture;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ConsumerTaskExecutor
+public class ChannelTaskExecutor
 {
   private final ThreadPoolTaskExecutor taskExecutor;
-  private final Runnable consumerTask;
+  @Getter
+  private final Channel channel;
   private final Consumer<String, AbstractMessageTo> consumer;
   private final WorkAssignor workAssignor;
 
-  CompletableFuture<Void> consumerTaskJob;
+  CompletableFuture<Void> channelTaskJob;
 
 
-  public void executeConsumerTask()
+  public void executeChannelTask()
   {
     workAssignor.assignWork(consumer);
-    log.info("Starting the consumer-task for {}", consumerTask);
-    consumerTaskJob = taskExecutor
-        .submitCompletable(consumerTask)
+    log.info("Starting the consumer-task for {}", channel);
+    channelTaskJob = taskExecutor
+        .submitCompletable(channel)
         .exceptionally(e ->
         {
-          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+          log.error("The consumer-task for {} exited abnormally!", channel, e);
           return null;
         });
   }
 
-  @PreDestroy
-  public void joinConsumerTaskJob()
+  public void join()
   {
-    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    log.info("Signaling the consumer-task for {} to quit its work", channel);
     consumer.wakeup();
-    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
-    consumerTaskJob.join();
-    log.info("Joined the consumer-task for {}", consumerTask);
+    log.info("Waiting for the consumer of {} to finish its work", channel);
+    channelTaskJob.join();
+    log.info("Joined the consumer-task for {}", channel);
   }
 }