FUCKING SHIIIIT!
authorKai Moritz <kai@juplo.de>
Wed, 30 Aug 2023 17:01:05 +0000 (19:01 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 30 Aug 2023 17:01:05 +0000 (19:01 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTestBase.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java
src/test/resources/application.yml

index 234554e..92f294b 100644 (file)
@@ -369,11 +369,13 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   {
     if (loadInProgress)
     {
+      log.error("Load in progress!");
       return Mono.error(new LoadInProgressException());
     }
 
     if (!isShardOwned[shard])
     {
+      log.error("Shard {} for chat-room {] not owned!", shard, id);
       return Mono.error(new ShardNotOwnedException(shard));
     }
 
index 07fb885..2788343 100644 (file)
@@ -26,10 +26,13 @@ public class KafkaChatHome implements ChatHome
     int shard = selectShard(id);
     return chatRoomChannel
         .getChatRoom(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
+        .switchIfEmpty(Mono.error(() -> {
+          log.error("Unknown chat-room {} (shard={})!", id, shard);
+          return new UnknownChatroomException(
+              id,
+              shard,
+              chatRoomChannel.getOwnedShards());
+        }));
   }
 
   int selectShard(UUID chatRoomId)
index 8e7cecb..bbe62a1 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
+import java.time.Duration;
 import java.util.UUID;
 
 import static pl.rzrz.assertj.reactor.Assertions.assertThat;
@@ -30,7 +31,11 @@ public class ChatHomeTestBase
     // When
     Mono<ChatRoom> mono = chatHome
         .getChatRoom(chatRoomId)
-        .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+        .log()
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException))
+        .log();
 
     // Then
     assertThat(mono).emitsCount(1);
@@ -46,7 +51,11 @@ public class ChatHomeTestBase
     // When
     Mono<ChatRoom> mono = chatHome
         .getChatRoom(chatRoomId)
-        .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+        .log()
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException))
+        .log();
 
     // Then
     assertThat(mono).sendsError(e ->
index 7de73ca..7444570 100644 (file)
@@ -5,6 +5,7 @@ import org.junit.jupiter.api.Test;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
+import java.time.Duration;
 import java.util.UUID;
 
 import static pl.rzrz.assertj.reactor.Assertions.assertThat;
@@ -27,7 +28,11 @@ public class ChatHomeWithShardsTestBase extends ChatHomeTestBase
     // When
     Mono<ChatRoom> mono = chatHome
         .getChatRoom(chatRoomId)
-        .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+        .log()
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException))
+        .log();
 
     // Then
     assertThat(mono).sendsError(e ->
index 96b0cb3..856b2e2 100644 (file)
@@ -1,4 +1,4 @@
 logging:
   level:
     root: INFO
-    de.juplo.kafka.chat.backend: DEBUG
+    de.juplo.kafka.chat.backend: TRACE