WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:09:15 +0000 (15:09 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 14:09:15 +0000 (15:09 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelExecutor.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

index fb55448..087d94d 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -15,7 +16,8 @@ import java.util.concurrent.CompletableFuture;
 public class ChannelExecutor
 {
   private final ThreadPoolTaskExecutor taskExecutor;
-  private final Runnable consumerTask;
+  @Getter
+  private final Channel channel;
   private final Consumer<String, AbstractMessageTo> consumer;
   private final WorkAssignor workAssignor;
 
@@ -25,12 +27,12 @@ public class ChannelExecutor
   public void executeConsumerTask()
   {
     workAssignor.assignWork(consumer);
-    log.info("Starting the consumer-task for {}", consumerTask);
+    log.info("Starting the consumer-task for {}", channel);
     consumerTaskJob = taskExecutor
-        .submitCompletable(consumerTask)
+        .submitCompletable(channel)
         .exceptionally(e ->
         {
-          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+          log.error("The consumer-task for {} exited abnormally!", channel, e);
           return null;
         });
   }
@@ -38,10 +40,10 @@ public class ChannelExecutor
   @PreDestroy
   public void joinConsumerTaskJob()
   {
-    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    log.info("Signaling the consumer-task for {} to quit its work", channel);
     consumer.wakeup();
-    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+    log.info("Waiting for the consumer of {} to finish its work", channel);
     consumerTaskJob.join();
-    log.info("Joined the consumer-task for {}", consumerTask);
+    log.info("Joined the consumer-task for {}", channel);
   }
 }
index 9520448..92db79b 100644 (file)
@@ -8,24 +8,25 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class ChannelRunner
 {
-  private final ChannelExecutor infoChannelChannelExecutor;
-  private final ChannelExecutor dataChannelChannelExecutor;
-  private final InfoChannel infoChannel;
+  private final ChannelExecutor infoChannelExecutor;
+  private final ChannelExecutor dataChannelExecutor;
 
-  public void executeConsumerTasks()
+  public void executeChannel()
   {
-    infoChannelChannelExecutor.executeConsumerTask();
-    dataChannelChannelExecutor.executeConsumerTask();
+    infoChannelExecutor.executeConsumerTask();
+    dataChannelExecutor.executeConsumerTask();
   }
 
-  public void joinConsumerTasks() throws InterruptedException
+  public void joinChannel() throws InterruptedException
   {
-    dataChannelChannelExecutor.joinConsumerTaskJob();
+    dataChannelExecutor.joinConsumerTaskJob();
     while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
     {
       log.info("Waiting for {} to shut down...", infoChannel);
       Thread.sleep(1000);
     }
-    infoChannelChannelExecutor.joinConsumerTaskJob();
+    infoChannelExecutor.joinConsumerTaskJob();
   }
+
+  private void joinChannel()
 }
index 5a5a683..0b572f4 100644 (file)
@@ -8,6 +8,7 @@ import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.producer.Producer;
@@ -24,8 +25,9 @@ import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceId" })
 @Slf4j
-public class DataChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Channel, ConsumerRebalanceListener
 {
   private final String instanceId;
   private final String topic;
index 13556da..ec4a5eb 100644 (file)
@@ -6,6 +6,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatR
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -24,8 +25,9 @@ import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceUri" })
 @Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
index ba42707..f44c9b5 100644 (file)
@@ -22,12 +22,12 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Override
   public void run(ApplicationArguments args)
   {
-    channelRunner.executeConsumerTasks();
+    channelRunner.executeChannel();
   }
 
   @PreDestroy
   public void joinConsumerTasks() throws InterruptedException
   {
-    channelRunner.joinConsumerTasks();
+    channelRunner.joinChannel();
   }
 }
index eb1a19d..ebd1b68 100644 (file)
@@ -52,7 +52,7 @@ public abstract class KafkaTestUtils
     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
-    channelRunner.executeConsumerTasks();
+    channelRunner.executeChannel();
   }
 
   private static void send(
@@ -74,6 +74,6 @@ public abstract class KafkaTestUtils
 
   public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException
   {
-    channelRunner.joinConsumerTasks();
+    channelRunner.joinChannel();
   }
 }