WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 13:45:22 +0000 (14:45 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 13:45:22 +0000 (14:45 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java
deleted file mode 100644 (file)
index 7d8f3d5..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.chat.backend.domain.exceptions;
-
-
-import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
-
-
-public class ChannelNotReadyException extends IllegalStateException
-{
-  public final DataChannel.State state;
-
-  public ChannelNotReadyException(DataChannel.State state)
-  {
-    super("Not ready! Current state: " + state);
-    this.state = state;
-  }
-}
index 808187b..0746748 100644 (file)
@@ -3,8 +3,11 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 public class ChannelNotReadyException extends IllegalStateException
 {
-  public ChannelNotReadyException()
+  public final ChannelState state;
+
+  public ChannelNotReadyException(ChannelState state)
   {
-    super("Load in progress...");
+    super("Not ready! Current state: " + state);
+    this.state = state;
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java
new file mode 100644 (file)
index 0000000..554b4d6
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+public enum ChannelState
+{
+  STARTING,
+  LOAD_IN_PROGRESS,
+  READY,
+  SHUTTING_DOWN
+}
index 4daaef1..5a5a683 100644 (file)
@@ -1,6 +1,9 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
@@ -14,7 +17,10 @@ import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 import java.util.stream.IntStream;
 
 
@@ -39,7 +45,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   private boolean running;
   @Getter
-  private volatile State state = State.STARTING;
+  private volatile ChannelState channelState = ChannelState.STARTING;
 
 
   public DataChannel(
@@ -128,7 +134,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
     log.info("Newly assigned partitions! Pausing normal operations...");
-    state = State.LOAD_IN_PROGRESS;
+    channelState = ChannelState.LOAD_IN_PROGRESS;
 
     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
     {
@@ -195,7 +201,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.info("Fetched {} messages", records.count());
 
-        switch (state)
+        switch (channelState)
         {
           case LOAD_IN_PROGRESS ->
           {
@@ -206,7 +212,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
               log.info("Loading of messages completed! Pausing all owned partitions...");
               pauseAllOwnedPartions();
               log.info("Resuming normal operations...");
-              state = State.READY;
+              channelState = ChannelState.READY;
             }
           }
           case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count());
@@ -214,7 +220,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
           {
             if (!records.isEmpty())
             {
-              throw new IllegalStateException("All owned partitions should be paused, when in state " + state);
+              throw new IllegalStateException("All owned partitions should be paused, when in state " + channelState);
             }
           }
         }
@@ -320,8 +326,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
   {
-    State capturedState = state;
-    if (capturedState != State.READY)
+    ChannelState capturedState = channelState;
+    if (capturedState != ChannelState.READY)
     {
       return Mono.error(new ChannelNotReadyException(capturedState));
     }
@@ -359,12 +365,4 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   {
     return consumer.groupMetadata();
   }
-
-  public enum State
-  {
-    STARTING,
-    LOAD_IN_PROGRESS,
-    READY,
-    SHUTTING_DOWN
-  }
 }
index 80938fc..13556da 100644 (file)
@@ -17,7 +17,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.net.URI;
-import java.time.*;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -41,7 +41,7 @@ public class InfoChannel implements Runnable
 
   private boolean running;
   @Getter
-  private volatile boolean loadInProgress = true;
+  private volatile ChannelState channelState = ChannelState.STARTING;
 
 
   public InfoChannel(
@@ -191,7 +191,7 @@ public class InfoChannel implements Runnable
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = 0l);
-    loadInProgress = true;
+    channelState = ChannelState.LOAD_IN_PROGRESS;
 
     while (running)
     {
@@ -218,10 +218,15 @@ public class InfoChannel implements Runnable
   private void updateNextOffset(int partition, long nextOffset)
   {
     this.nextOffset[partition] = nextOffset;
-    if (loadInProgress) {
-      loadInProgress = IntStream
+    if (channelState == ChannelState.LOAD_IN_PROGRESS)
+    {
+      boolean loadInProgress = IntStream
           .range(0, numShards)
           .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+      if (!loadInProgress)
+      {
+        channelState = ChannelState.READY;
+      }
     }
   }
 
@@ -295,9 +300,10 @@ public class InfoChannel implements Runnable
 
   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    if (loadInProgress)
+    ChannelState capturedState = channelState;
+    if (capturedState != ChannelState.READY)
     {
-      return Mono.error(new ChannelNotReadyException());
+      return Mono.error(new ChannelNotReadyException(capturedState));
     }
 
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));