refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 13:34:34 +0000 (14:34 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 13:34:34 +0000 (14:34 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java

index 8a0a81f..808187b 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.chat.backend.domain.exceptions;
+package de.juplo.kafka.chat.backend.implementation.kafka;
 
 
-public class LoadInProgressException extends IllegalStateException
+public class ChannelNotReadyException extends IllegalStateException
 {
-  public LoadInProgressException()
+  public ChannelNotReadyException()
   {
     super("Load in progress...");
   }
index e1754a1..3af0a57 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
@@ -319,7 +318,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   {
     if (loadInProgress)
     {
-      return Mono.error(new LoadInProgressException());
+      return Mono.error(new ChannelNotReadyException());
     }
 
     if (!isShardOwned[shard])
index f3150ce..80938fc 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
@@ -13,7 +12,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -299,7 +297,7 @@ public class InfoChannel implements Runnable
   {
     if (loadInProgress)
     {
-      return Mono.error(new LoadInProgressException());
+      return Mono.error(new ChannelNotReadyException());
     }
 
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
index d859b14..3be9a35 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.domain;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
 import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration;
 import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration;
@@ -49,7 +49,7 @@ public abstract class ChatHomeServiceTest
         .log("testGetExistingChatroom")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
 
     // Then
     assertThat(mono).emitsCount(1);
@@ -68,7 +68,7 @@ public abstract class ChatHomeServiceTest
         .log("testGetNonExistentChatroom")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
 
     // Then
     assertThat(mono).sendsError(e ->
index 5eeac47..6d15675 100644 (file)
@@ -1,6 +1,6 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -33,7 +33,7 @@ public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest
         .log("testGetChatroomForNotOwnedShard")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
 
     // Then
     assertThat(mono).sendsError(e ->