WIP:refactor: Refined channel-states, introduced `ChannelState` -- MOVE
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:10:28 +0000 (15:10 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:10:28 +0000 (15:10 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java [new file with mode: 0644]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java
deleted file mode 100644 (file)
index 087d94d..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.concurrent.CompletableFuture;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ChannelExecutor
-{
-  private final ThreadPoolTaskExecutor taskExecutor;
-  @Getter
-  private final Channel channel;
-  private final Consumer<String, AbstractMessageTo> consumer;
-  private final WorkAssignor workAssignor;
-
-  CompletableFuture<Void> consumerTaskJob;
-
-
-  public void executeConsumerTask()
-  {
-    workAssignor.assignWork(consumer);
-    log.info("Starting the consumer-task for {}", channel);
-    consumerTaskJob = taskExecutor
-        .submitCompletable(channel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer-task for {} exited abnormally!", channel, e);
-          return null;
-        });
-  }
-
-  @PreDestroy
-  public void joinConsumerTaskJob()
-  {
-    log.info("Signaling the consumer-task for {} to quit its work", channel);
-    consumer.wakeup();
-    log.info("Waiting for the consumer of {} to finish its work", channel);
-    consumerTaskJob.join();
-    log.info("Joined the consumer-task for {}", channel);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java
deleted file mode 100644 (file)
index 92db79b..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ChannelRunner
-{
-  private final ChannelExecutor infoChannelExecutor;
-  private final ChannelExecutor dataChannelExecutor;
-
-  public void executeChannel()
-  {
-    infoChannelExecutor.executeConsumerTask();
-    dataChannelExecutor.executeConsumerTask();
-  }
-
-  public void joinChannel() throws InterruptedException
-  {
-    dataChannelExecutor.joinConsumerTaskJob();
-    while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
-    {
-      log.info("Waiting for {} to shut down...", infoChannel);
-      Thread.sleep(1000);
-    }
-    infoChannelExecutor.joinConsumerTaskJob();
-  }
-
-  private void joinChannel()
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java
new file mode 100644 (file)
index 0000000..087d94d
--- /dev/null
@@ -0,0 +1,49 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ChannelExecutor
+{
+  private final ThreadPoolTaskExecutor taskExecutor;
+  @Getter
+  private final Channel channel;
+  private final Consumer<String, AbstractMessageTo> consumer;
+  private final WorkAssignor workAssignor;
+
+  CompletableFuture<Void> consumerTaskJob;
+
+
+  public void executeConsumerTask()
+  {
+    workAssignor.assignWork(consumer);
+    log.info("Starting the consumer-task for {}", channel);
+    consumerTaskJob = taskExecutor
+        .submitCompletable(channel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer-task for {} exited abnormally!", channel, e);
+          return null;
+        });
+  }
+
+  @PreDestroy
+  public void joinConsumerTaskJob()
+  {
+    log.info("Signaling the consumer-task for {} to quit its work", channel);
+    consumer.wakeup();
+    log.info("Waiting for the consumer of {} to finish its work", channel);
+    consumerTaskJob.join();
+    log.info("Joined the consumer-task for {}", channel);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java
new file mode 100644 (file)
index 0000000..92db79b
--- /dev/null
@@ -0,0 +1,32 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ChannelRunner
+{
+  private final ChannelExecutor infoChannelExecutor;
+  private final ChannelExecutor dataChannelExecutor;
+
+  public void executeChannel()
+  {
+    infoChannelExecutor.executeConsumerTask();
+    dataChannelExecutor.executeConsumerTask();
+  }
+
+  public void joinChannel() throws InterruptedException
+  {
+    dataChannelExecutor.joinConsumerTaskJob();
+    while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
+    {
+      log.info("Waiting for {} to shut down...", infoChannel);
+      Thread.sleep(1000);
+    }
+    infoChannelExecutor.joinConsumerTaskJob();
+  }
+
+  private void joinChannel()
+}