WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:33:11 +0000 (15:33 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:33:11 +0000 (15:33 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index e61ecfe..d274cf2 100644 (file)
@@ -38,7 +38,7 @@ public class ChannelTaskExecutor
   }
 
   @PreDestroy
-  public void joinConsumerTaskJob()
+  public void join()
   {
     log.info("Signaling the consumer-task for {} to quit its work", channel);
     consumer.wakeup();
index 015f7d5..9e8576d 100644 (file)
@@ -19,14 +19,20 @@ public class ChannelTaskRunner
 
   public void joinChannel() throws InterruptedException
   {
-    dataChannelTaskExecutor.joinConsumerTaskJob();
-    while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
+    joinChannel(dataChannelTaskExecutor);
+    joinChannel(infoChannelTaskExecutor);
+  }
+
+  private void joinChannel(
+      ChannelTaskExecutor channelTaskExecutor)
+      throws InterruptedException
+  {
+    Channel channel = channelTaskExecutor.getChannel();
+    while (channel.getChannelState() != ChannelState.SHUTTING_DOWN)
     {
-      log.info("Waiting for {} to shut down...", infoChannel);
+      log.info("Waiting for {} to shut down...", channel);
       Thread.sleep(1000);
     }
-    infoChannelTaskExecutor.joinConsumerTaskJob();
+    channelTaskExecutor.join();
   }
-
-  private void joinChannel()
 }
index 0b572f4..2468af5 100644 (file)
@@ -230,6 +230,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       catch (WakeupException e)
       {
         log.info("Received WakeupException, exiting!");
+        channelState = ChannelState.SHUTTING_DOWN;
         running = false;
       }
     }
index ec4a5eb..7665fae 100644 (file)
@@ -210,6 +210,7 @@ public class InfoChannel implements Channel
       catch (WakeupException e)
       {
         log.info("Received WakeupException, exiting!");
+        channelState = ChannelState.SHUTTING_DOWN;
         running = false;
       }
     }
index 1088cab..857f8ec 100644 (file)
@@ -46,8 +46,7 @@ public class KafkaServicesConfiguration
   {
     return new ChannelTaskRunner(
         infoChannelTaskExecutor,
-        dataChannelTaskExecutor,
-        infoChannel);
+        dataChannelTaskExecutor);
   }
 
   @Bean