WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 13:37:23 +0000 (14:37 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 13:37:23 +0000 (14:37 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
src/test/java/de/juplo/kafka/chat/backend/TestWriter.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java
new file mode 100644 (file)
index 0000000..7d8f3d5
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.chat.backend.domain.exceptions;
+
+
+import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
+
+
+public class ChannelNotReadyException extends IllegalStateException
+{
+  public final DataChannel.State state;
+
+  public ChannelNotReadyException(DataChannel.State state)
+  {
+    super("Not ready! Current state: " + state);
+    this.state = state;
+  }
+}
index 3af0a57..4daaef1 100644 (file)
@@ -39,7 +39,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   private boolean running;
   @Getter
-  private volatile boolean loadInProgress;
+  private volatile State state = State.STARTING;
 
 
   public DataChannel(
@@ -128,7 +128,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
     log.info("Newly assigned partitions! Pausing normal operations...");
-    loadInProgress = true;
+    state = State.LOAD_IN_PROGRESS;
 
     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
     {
@@ -195,23 +195,27 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.info("Fetched {} messages", records.count());
 
-        if (loadInProgress)
+        switch (state)
         {
-          loadChatRoomData(records);
-
-          if (isLoadingCompleted())
+          case LOAD_IN_PROGRESS ->
           {
-            log.info("Loading of messages completed! Pausing all owned partitions...");
-            pauseAllOwnedPartions();
-            log.info("Resuming normal operations...");
-            loadInProgress = false;
+            loadChatRoomData(records);
+
+            if (isLoadingCompleted())
+            {
+              log.info("Loading of messages completed! Pausing all owned partitions...");
+              pauseAllOwnedPartions();
+              log.info("Resuming normal operations...");
+              state = State.READY;
+            }
           }
-        }
-        else
-        {
-          if (!records.isEmpty())
+          case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count());
+          default ->
           {
-            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+            if (!records.isEmpty())
+            {
+              throw new IllegalStateException("All owned partitions should be paused, when in state " + state);
+            }
           }
         }
       }
@@ -316,9 +320,10 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
   {
-    if (loadInProgress)
+    State capturedState = state;
+    if (capturedState != State.READY)
     {
-      return Mono.error(new ChannelNotReadyException());
+      return Mono.error(new ChannelNotReadyException(capturedState));
     }
 
     if (!isShardOwned[shard])
@@ -354,4 +359,12 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   {
     return consumer.groupMetadata();
   }
+
+  public enum State
+  {
+    STARTING,
+    LOAD_IN_PROGRESS,
+    READY,
+    SHUTTING_DOWN
+  }
 }
index bac07ff..b335155 100644 (file)
@@ -69,8 +69,24 @@ public abstract class AbstractHandoverIT
     log.info("Sleeping for 3 seconds...");
     Thread.sleep(3000);
 
+    log.info("Starting Backend 2...");
+    containers.backend2.start();
+    log.info("Backend 2 started");
+
+    log.info("Sleeping for 3 seconds...");
+    Thread.sleep(3000);
+
+    log.info("Starting Backend 3...");
+    containers.backend3.start();
+    log.info("Backend 3 started");
+
+    log.info("Sleeping for 3 seconds...");
+    Thread.sleep(3000);
+
+    log.info("Shutting down writers...");
     for (int i = 0; i < NUM_CLIENTS; i++)
     {
+      log.info("Shutting down writer {}", i);
       testWriters[i].running = false;
       testWriterFutures[i].join();
       log.info("Joined TestWriter {}", testWriters[i].user);
index d4cc719..c21acda 100644 (file)
@@ -45,7 +45,7 @@ public class TestWriter
         .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
         .map(i -> "Message #" + i)
         .flatMap(message -> sendMessage(chatRoom, message)
-            .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
+            .retryWhen(Retry.backoff(10, Duration.ofSeconds(1))))
         .doOnNext(message ->
         {
           sentMessages.add(message);