From e064a27030d6e3ab9d6faae91b89d6b23ecaf093 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 4 Mar 2024 14:34:34 +0100 Subject: [PATCH] refactor: Refined channel-states, introduced `ChannelState` -- ALIGN --- .../implementation/kafka/ChannelNotReadyException.java | 6 +++--- .../chat/backend/implementation/kafka/DataChannel.java | 3 +-- .../chat/backend/implementation/kafka/InfoChannel.java | 4 +--- .../kafka/chat/backend/domain/ChatHomeServiceTest.java | 6 +++--- .../chat/backend/domain/ChatHomeServiceWithShardsTest.java | 4 ++-- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java index 8a0a81f9..808187be 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java @@ -1,9 +1,9 @@ -package de.juplo.kafka.chat.backend.domain.exceptions; +package de.juplo.kafka.chat.backend.implementation.kafka; -public class LoadInProgressException extends IllegalStateException +public class ChannelNotReadyException extends IllegalStateException { - public LoadInProgressException() + public ChannelNotReadyException() { super("Load in progress..."); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index e1754a15..3af0a570 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; @@ -319,7 +318,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { if (loadInProgress) { - return Mono.error(new LoadInProgressException()); + return Mono.error(new ChannelNotReadyException()); } if (!isShardOwned[shard]) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index f3150ce2..80938fcd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; @@ -13,7 +12,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -299,7 +297,7 @@ public class InfoChannel implements Runnable { if (loadInProgress) { - return Mono.error(new LoadInProgressException()); + return Mono.error(new ChannelNotReadyException()); } return Mono.fromSupplier(() -> chatRoomInfo.get(id)); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java index d859b141..3be9a35a 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java @@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.domain; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException; import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration; import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration; @@ -49,7 +49,7 @@ public abstract class ChatHomeServiceTest .log("testGetExistingChatroom") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); + .filter(throwable -> throwable instanceof ChannelNotReadyException)); // Then assertThat(mono).emitsCount(1); @@ -68,7 +68,7 @@ public abstract class ChatHomeServiceTest .log("testGetNonExistentChatroom") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); + .filter(throwable -> throwable instanceof ChannelNotReadyException)); // Then assertThat(mono).sendsError(e -> diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java index 5eeac471..6d156750 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java @@ -1,6 +1,6 @@ package de.juplo.kafka.chat.backend.domain; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -33,7 +33,7 @@ public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest .log("testGetChatroomForNotOwnedShard") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); + .filter(throwable -> throwable instanceof ChannelNotReadyException)); // Then assertThat(mono).sendsError(e -> -- 2.20.1