WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ConsumerTaskExecutor.java
index b635dfc..9425bdf 100644 (file)
@@ -5,58 +5,43 @@ import jakarta.annotation.PreDestroy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 
-@ConditionalOnProperty(
-    prefix = "chat.backend",
-    name = "services",
-    havingValue = "kafka")
 @RequiredArgsConstructor
 @Slf4j
-public class KafkaServicesApplicationRunner implements ApplicationRunner
+public class ConsumerTaskExecutor
 {
   private final ThreadPoolTaskExecutor taskExecutor;
-  private final ChatRoomChannel chatRoomChannel;
-  private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  private final Runnable consumerTask;
+  private final Consumer<String, AbstractMessageTo> consumer;
   private final WorkAssignor workAssignor;
 
-  CompletableFuture<Void> chatRoomChannelConsumerJob;
+  CompletableFuture<Void> consumerTaskJob;
 
 
-  @Override
-  public void run(ApplicationArguments args) throws Exception
+  public void executeConsumerTask()
   {
-    workAssignor.assignWork(chatRoomChannelConsumer);
-    log.info("Starting the consumer for the ChatRoomChannel");
-    chatRoomChannelConsumerJob = taskExecutor
-        .submitCompletable(chatRoomChannel)
+    workAssignor.assignWork(consumer);
+    log.info("Starting the consumer-task for {}", consumerTask);
+    consumerTaskJob = taskExecutor
+        .submitCompletable(consumerTask)
         .exceptionally(e ->
         {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
           return null;
         });
   }
 
   @PreDestroy
-  public void joinChatRoomChannelConsumerJob()
+  public void joinConsumerTaskJob()
   {
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    chatRoomChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
-  }
-
-
-  interface WorkAssignor
-  {
-    void assignWork(Consumer<?, ?> consumer);
+    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    consumer.wakeup();
+    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+    consumerTaskJob.join();
+    log.info("Joined the consumer-task for {}", consumerTask);
   }
 }