WIP:refactor: Refined channel-states, introduced `ChannelState` -- MOVE
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:05:14 +0000 (15:05 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:05:14 +0000 (15:05 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java
new file mode 100644 (file)
index 0000000..9425bdf
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskExecutor
+{
+  private final ThreadPoolTaskExecutor taskExecutor;
+  private final Runnable consumerTask;
+  private final Consumer<String, AbstractMessageTo> consumer;
+  private final WorkAssignor workAssignor;
+
+  CompletableFuture<Void> consumerTaskJob;
+
+
+  public void executeConsumerTask()
+  {
+    workAssignor.assignWork(consumer);
+    log.info("Starting the consumer-task for {}", consumerTask);
+    consumerTaskJob = taskExecutor
+        .submitCompletable(consumerTask)
+        .exceptionally(e ->
+        {
+          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+          return null;
+        });
+  }
+
+  @PreDestroy
+  public void joinConsumerTaskJob()
+  {
+    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    consumer.wakeup();
+    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+    consumerTaskJob.join();
+    log.info("Joined the consumer-task for {}", consumerTask);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java
new file mode 100644 (file)
index 0000000..0c6de1d
--- /dev/null
@@ -0,0 +1,31 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskRunner
+{
+  private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
+  private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+  private final InfoChannel infoChannel;
+
+  public void executeConsumerTasks()
+  {
+    infoChannelConsumerTaskExecutor.executeConsumerTask();
+    dataChannelConsumerTaskExecutor.executeConsumerTask();
+  }
+
+  public void joinConsumerTasks() throws InterruptedException
+  {
+    dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
+    {
+      log.info("Waiting for {} to shut down...", infoChannel);
+      Thread.sleep(1000);
+    }
+    infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
deleted file mode 100644 (file)
index 9425bdf..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.concurrent.CompletableFuture;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ConsumerTaskExecutor
-{
-  private final ThreadPoolTaskExecutor taskExecutor;
-  private final Runnable consumerTask;
-  private final Consumer<String, AbstractMessageTo> consumer;
-  private final WorkAssignor workAssignor;
-
-  CompletableFuture<Void> consumerTaskJob;
-
-
-  public void executeConsumerTask()
-  {
-    workAssignor.assignWork(consumer);
-    log.info("Starting the consumer-task for {}", consumerTask);
-    consumerTaskJob = taskExecutor
-        .submitCompletable(consumerTask)
-        .exceptionally(e ->
-        {
-          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
-          return null;
-        });
-  }
-
-  @PreDestroy
-  public void joinConsumerTaskJob()
-  {
-    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
-    consumer.wakeup();
-    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
-    consumerTaskJob.join();
-    log.info("Joined the consumer-task for {}", consumerTask);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
deleted file mode 100644 (file)
index 0c6de1d..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ConsumerTaskRunner
-{
-  private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
-  private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
-  private final InfoChannel infoChannel;
-
-  public void executeConsumerTasks()
-  {
-    infoChannelConsumerTaskExecutor.executeConsumerTask();
-    dataChannelConsumerTaskExecutor.executeConsumerTask();
-  }
-
-  public void joinConsumerTasks() throws InterruptedException
-  {
-    dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
-    while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
-    {
-      log.info("Waiting for {} to shut down...", infoChannel);
-      Thread.sleep(1000);
-    }
-    infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
-  }
-}