From c02f290a50f9ae8d3fe3dc50bc68d8fbe122940e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 30 Aug 2023 19:01:05 +0200 Subject: [PATCH] FUCKING SHIIIIT! --- .../backend/persistence/kafka/ChatRoomChannel.java | 2 ++ .../backend/persistence/kafka/KafkaChatHome.java | 11 +++++++---- .../kafka/chat/backend/domain/ChatHomeTestBase.java | 13 +++++++++++-- .../backend/domain/ChatHomeWithShardsTestBase.java | 7 ++++++- src/test/resources/application.yml | 2 +- 5 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 234554eb..92f294b2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -369,11 +369,13 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener { if (loadInProgress) { + log.error("Load in progress!"); return Mono.error(new LoadInProgressException()); } if (!isShardOwned[shard]) { + log.error("Shard {} for chat-room {] not owned!", shard, id); return Mono.error(new ShardNotOwnedException(shard)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 07fb8858..2788343f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -26,10 +26,13 @@ public class KafkaChatHome implements ChatHome int shard = selectShard(id); return chatRoomChannel .getChatRoom(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); + .switchIfEmpty(Mono.error(() -> { + log.error("Unknown chat-room {} (shard={})!", id, shard); + return new UnknownChatroomException( + id, + shard, + chatRoomChannel.getOwnedShards()); + })); } int selectShard(UUID chatRoomId) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTestBase.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTestBase.java index 8e7cecb2..bbe62a17 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTestBase.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTestBase.java @@ -8,6 +8,7 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; +import java.time.Duration; import java.util.UUID; import static pl.rzrz.assertj.reactor.Assertions.assertThat; @@ -30,7 +31,11 @@ public class ChatHomeTestBase // When Mono mono = chatHome .getChatRoom(chatRoomId) - .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException)); + .log() + .retryWhen(Retry + .backoff(5, Duration.ofSeconds(1)) + .filter(throwable -> throwable instanceof LoadInProgressException)) + .log(); // Then assertThat(mono).emitsCount(1); @@ -46,7 +51,11 @@ public class ChatHomeTestBase // When Mono mono = chatHome .getChatRoom(chatRoomId) - .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException)); + .log() + .retryWhen(Retry + .backoff(5, Duration.ofSeconds(1)) + .filter(throwable -> throwable instanceof LoadInProgressException)) + .log(); // Then assertThat(mono).sendsError(e -> diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java index 7de73cad..74445708 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java @@ -5,6 +5,7 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; +import java.time.Duration; import java.util.UUID; import static pl.rzrz.assertj.reactor.Assertions.assertThat; @@ -27,7 +28,11 @@ public class ChatHomeWithShardsTestBase extends ChatHomeTestBase // When Mono mono = chatHome .getChatRoom(chatRoomId) - .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException)); + .log() + .retryWhen(Retry + .backoff(5, Duration.ofSeconds(1)) + .filter(throwable -> throwable instanceof LoadInProgressException)) + .log(); // Then assertThat(mono).sendsError(e -> diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 96b0cb3b..856b2e24 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -1,4 +1,4 @@ logging: level: root: INFO - de.juplo.kafka.chat.backend: DEBUG + de.juplo.kafka.chat.backend: TRACE -- 2.20.1