-package de.juplo.kafka.chat.backend.domain.exceptions;
+package de.juplo.kafka.chat.backend.implementation.kafka;
-public class LoadInProgressException extends IllegalStateException
+public class ChannelNotReadyException extends IllegalStateException
{
- public LoadInProgressException()
+ public ChannelNotReadyException()
{
super("Load in progress...");
}
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
{
if (loadInProgress)
{
- return Mono.error(new LoadInProgressException());
+ return Mono.error(new ChannelNotReadyException());
}
if (!isShardOwned[shard])
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
{
if (loadInProgress)
{
- return Mono.error(new LoadInProgressException());
+ return Mono.error(new ChannelNotReadyException());
}
return Mono.fromSupplier(() -> chatRoomInfo.get(id));
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration;
import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration;
.log("testGetExistingChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
// Then
assertThat(mono).emitsCount(1);
.log("testGetNonExistentChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
// Then
assertThat(mono).sendsError(e ->
package de.juplo.kafka.chat.backend.domain;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
.log("testGetChatroomForNotOwnedShard")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
// Then
assertThat(mono).sendsError(e ->