refactor: Refined channel-states, introduced `ChannelState` -- MOVE
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 13:32:14 +0000 (14:32 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 14 Mar 2024 08:11:21 +0000 (09:11 +0100)
* Renamed and moved `LoadInProgressException`
** Moved exception into implementation-specific package
** Renamed exception to `ChannelNotReadyException`
* Renamed `ConsumerTaskExecutor` into `ChannelTaskExecutor`
* Renamed `ConsumerTaskRunner` into `ChannelTaskRunner`

src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/LoadInProgressException.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/LoadInProgressException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/LoadInProgressException.java
deleted file mode 100644 (file)
index 8a0a81f..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-package de.juplo.kafka.chat.backend.domain.exceptions;
-
-
-public class LoadInProgressException extends IllegalStateException
-{
-  public LoadInProgressException()
-  {
-    super("Load in progress...");
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java
new file mode 100644 (file)
index 0000000..8a0a81f
--- /dev/null
@@ -0,0 +1,10 @@
+package de.juplo.kafka.chat.backend.domain.exceptions;
+
+
+public class LoadInProgressException extends IllegalStateException
+{
+  public LoadInProgressException()
+  {
+    super("Load in progress...");
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java
new file mode 100644 (file)
index 0000000..9425bdf
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskExecutor
+{
+  private final ThreadPoolTaskExecutor taskExecutor;
+  private final Runnable consumerTask;
+  private final Consumer<String, AbstractMessageTo> consumer;
+  private final WorkAssignor workAssignor;
+
+  CompletableFuture<Void> consumerTaskJob;
+
+
+  public void executeConsumerTask()
+  {
+    workAssignor.assignWork(consumer);
+    log.info("Starting the consumer-task for {}", consumerTask);
+    consumerTaskJob = taskExecutor
+        .submitCompletable(consumerTask)
+        .exceptionally(e ->
+        {
+          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+          return null;
+        });
+  }
+
+  @PreDestroy
+  public void joinConsumerTaskJob()
+  {
+    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    consumer.wakeup();
+    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+    consumerTaskJob.join();
+    log.info("Joined the consumer-task for {}", consumerTask);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java
new file mode 100644 (file)
index 0000000..c2c2801
--- /dev/null
@@ -0,0 +1,31 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskRunner
+{
+  private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
+  private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+  private final InfoChannel infoChannel;
+
+  public void executeConsumerTasks()
+  {
+    infoChannelConsumerTaskExecutor.executeConsumerTask();
+    dataChannelConsumerTaskExecutor.executeConsumerTask();
+  }
+
+  public void joinConsumerTasks() throws InterruptedException
+  {
+    dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    while (infoChannel.isLoadInProgress())
+    {
+      log.info("Waiting for {} to finish loading...", infoChannel);
+      Thread.sleep(1000);
+    }
+    infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
deleted file mode 100644 (file)
index 9425bdf..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.concurrent.CompletableFuture;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ConsumerTaskExecutor
-{
-  private final ThreadPoolTaskExecutor taskExecutor;
-  private final Runnable consumerTask;
-  private final Consumer<String, AbstractMessageTo> consumer;
-  private final WorkAssignor workAssignor;
-
-  CompletableFuture<Void> consumerTaskJob;
-
-
-  public void executeConsumerTask()
-  {
-    workAssignor.assignWork(consumer);
-    log.info("Starting the consumer-task for {}", consumerTask);
-    consumerTaskJob = taskExecutor
-        .submitCompletable(consumerTask)
-        .exceptionally(e ->
-        {
-          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
-          return null;
-        });
-  }
-
-  @PreDestroy
-  public void joinConsumerTaskJob()
-  {
-    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
-    consumer.wakeup();
-    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
-    consumerTaskJob.join();
-    log.info("Joined the consumer-task for {}", consumerTask);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
deleted file mode 100644 (file)
index c2c2801..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ConsumerTaskRunner
-{
-  private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
-  private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
-  private final InfoChannel infoChannel;
-
-  public void executeConsumerTasks()
-  {
-    infoChannelConsumerTaskExecutor.executeConsumerTask();
-    dataChannelConsumerTaskExecutor.executeConsumerTask();
-  }
-
-  public void joinConsumerTasks() throws InterruptedException
-  {
-    dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
-    while (infoChannel.isLoadInProgress())
-    {
-      log.info("Waiting for {} to finish loading...", infoChannel);
-      Thread.sleep(1000);
-    }
-    infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
-  }
-}