feat: Implemented and configured health-indicator for the ``Channel``s
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ChannelTaskRunner.java
index c2c2801..5e56528 100644 (file)
@@ -6,26 +6,33 @@ import lombok.extern.slf4j.Slf4j;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ConsumerTaskRunner
+public class ChannelTaskRunner
 {
-  private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
-  private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
-  private final InfoChannel infoChannel;
+  private final ChannelTaskExecutor infoChannelTaskExecutor;
+  private final ChannelTaskExecutor dataChannelTaskExecutor;
 
-  public void executeConsumerTasks()
+  public void executeChannels()
   {
-    infoChannelConsumerTaskExecutor.executeConsumerTask();
-    dataChannelConsumerTaskExecutor.executeConsumerTask();
+    infoChannelTaskExecutor.executeChannelTask();
+    dataChannelTaskExecutor.executeChannelTask();
   }
 
-  public void joinConsumerTasks() throws InterruptedException
+  public void joinChannels() throws InterruptedException
   {
-    dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
-    while (infoChannel.isLoadInProgress())
+    joinChannel(dataChannelTaskExecutor);
+    joinChannel(infoChannelTaskExecutor);
+  }
+
+  private void joinChannel(
+      ChannelTaskExecutor channelTaskExecutor)
+      throws InterruptedException
+  {
+    Channel channel = channelTaskExecutor.getChannel();
+    while (channel.getChannelState() != ChannelState.SHUTTING_DOWN)
     {
-      log.info("Waiting for {} to finish loading...", infoChannel);
+      log.info("Waiting for {} to shut down...", channel);
       Thread.sleep(1000);
     }
-    infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    channelTaskExecutor.join();
   }
 }