{
if (loadInProgress)
{
+ log.error("Load in progress!");
return Mono.error(new LoadInProgressException());
}
if (!isShardOwned[shard])
{
+ log.error("Shard {} for chat-room {] not owned!", shard, id);
return Mono.error(new ShardNotOwnedException(shard));
}
int shard = selectShard(id);
return chatRoomChannel
.getChatRoom(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
- id,
- shard,
- chatRoomChannel.getOwnedShards())));
+ .switchIfEmpty(Mono.error(() -> {
+ log.error("Unknown chat-room {} (shard={})!", id, shard);
+ return new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards());
+ }));
}
int selectShard(UUID chatRoomId)
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
+import java.time.Duration;
import java.util.UUID;
import static pl.rzrz.assertj.reactor.Assertions.assertThat;
// When
Mono<ChatRoom> mono = chatHome
.getChatRoom(chatRoomId)
- .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+ .log()
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof LoadInProgressException))
+ .log();
// Then
assertThat(mono).emitsCount(1);
// When
Mono<ChatRoom> mono = chatHome
.getChatRoom(chatRoomId)
- .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+ .log()
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof LoadInProgressException))
+ .log();
// Then
assertThat(mono).sendsError(e ->
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
+import java.time.Duration;
import java.util.UUID;
import static pl.rzrz.assertj.reactor.Assertions.assertThat;
// When
Mono<ChatRoom> mono = chatHome
.getChatRoom(chatRoomId)
- .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+ .log()
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof LoadInProgressException))
+ .log();
// Then
assertThat(mono).sendsError(e ->
logging:
level:
root: INFO
- de.juplo.kafka.chat.backend: DEBUG
+ de.juplo.kafka.chat.backend: TRACE