From: Kai Moritz <kai@juplo.de>
Date: Mon, 4 Mar 2024 14:09:15 +0000 (+0100)
Subject: WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
X-Git-Tag: rebase--2024-03-05--09-07~7
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=571bd14522b51222cb55d5bc6516cce09eb7c04b;p=demos%2Fkafka%2Fchat

WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
---

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java
index fb554483..087d94d3 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -15,7 +16,8 @@ import java.util.concurrent.CompletableFuture;
 public class ChannelExecutor
 {
   private final ThreadPoolTaskExecutor taskExecutor;
-  private final Runnable consumerTask;
+  @Getter
+  private final Channel channel;
   private final Consumer<String, AbstractMessageTo> consumer;
   private final WorkAssignor workAssignor;
 
@@ -25,12 +27,12 @@ public class ChannelExecutor
   public void executeConsumerTask()
   {
     workAssignor.assignWork(consumer);
-    log.info("Starting the consumer-task for {}", consumerTask);
+    log.info("Starting the consumer-task for {}", channel);
     consumerTaskJob = taskExecutor
-        .submitCompletable(consumerTask)
+        .submitCompletable(channel)
         .exceptionally(e ->
         {
-          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+          log.error("The consumer-task for {} exited abnormally!", channel, e);
           return null;
         });
   }
@@ -38,10 +40,10 @@ public class ChannelExecutor
   @PreDestroy
   public void joinConsumerTaskJob()
   {
-    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    log.info("Signaling the consumer-task for {} to quit its work", channel);
     consumer.wakeup();
-    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+    log.info("Waiting for the consumer of {} to finish its work", channel);
     consumerTaskJob.join();
-    log.info("Joined the consumer-task for {}", consumerTask);
+    log.info("Joined the consumer-task for {}", channel);
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java
index 95204486..92db79bf 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java
@@ -8,24 +8,25 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class ChannelRunner
 {
-  private final ChannelExecutor infoChannelChannelExecutor;
-  private final ChannelExecutor dataChannelChannelExecutor;
-  private final InfoChannel infoChannel;
+  private final ChannelExecutor infoChannelExecutor;
+  private final ChannelExecutor dataChannelExecutor;
 
-  public void executeConsumerTasks()
+  public void executeChannel()
   {
-    infoChannelChannelExecutor.executeConsumerTask();
-    dataChannelChannelExecutor.executeConsumerTask();
+    infoChannelExecutor.executeConsumerTask();
+    dataChannelExecutor.executeConsumerTask();
   }
 
-  public void joinConsumerTasks() throws InterruptedException
+  public void joinChannel() throws InterruptedException
   {
-    dataChannelChannelExecutor.joinConsumerTaskJob();
+    dataChannelExecutor.joinConsumerTaskJob();
     while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
     {
       log.info("Waiting for {} to shut down...", infoChannel);
       Thread.sleep(1000);
     }
-    infoChannelChannelExecutor.joinConsumerTaskJob();
+    infoChannelExecutor.joinConsumerTaskJob();
   }
+
+  private void joinChannel()
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
index 5a5a6838..0b572f48 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
@@ -8,6 +8,7 @@ import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.producer.Producer;
@@ -24,8 +25,9 @@ import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceId" })
 @Slf4j
-public class DataChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Channel, ConsumerRebalanceListener
 {
   private final String instanceId;
   private final String topic;
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
index 13556dab..ec4a5ebd 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
@@ -6,6 +6,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatR
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -24,8 +25,9 @@ import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceUri" })
 @Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
index ba427079..f44c9b52 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
@@ -22,12 +22,12 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Override
   public void run(ApplicationArguments args)
   {
-    channelRunner.executeConsumerTasks();
+    channelRunner.executeChannel();
   }
 
   @PreDestroy
   public void joinConsumerTasks() throws InterruptedException
   {
-    channelRunner.joinConsumerTasks();
+    channelRunner.joinChannel();
   }
 }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
index eb1a19d5..ebd1b680 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
@@ -52,7 +52,7 @@ public abstract class KafkaTestUtils
     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
-    channelRunner.executeConsumerTasks();
+    channelRunner.executeChannel();
   }
 
   private static void send(
@@ -74,6 +74,6 @@ public abstract class KafkaTestUtils
 
   public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException
   {
-    channelRunner.joinConsumerTasks();
+    channelRunner.joinChannel();
   }
 }