]> juplo.de Git - demos/kafka/chat/commitdiff
FUCKING SHIIIIT!
authorKai Moritz <kai@juplo.de>
Wed, 30 Aug 2023 17:01:05 +0000 (19:01 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 30 Aug 2023 17:01:05 +0000 (19:01 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTestBase.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java
src/test/resources/application.yml

index 234554ebbdd267ae1adefbc90d95190b7c99b767..92f294b23c7b46b2d6a33b82c6442b0d51db35e7 100644 (file)
@@ -369,11 +369,13 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   {
     if (loadInProgress)
     {
+      log.error("Load in progress!");
       return Mono.error(new LoadInProgressException());
     }
 
     if (!isShardOwned[shard])
     {
+      log.error("Shard {} for chat-room {] not owned!", shard, id);
       return Mono.error(new ShardNotOwnedException(shard));
     }
 
index 07fb8858744f7853382db7a4c2cccfe42e1def54..2788343f7b607c63f653ffe403917c7fc449be6a 100644 (file)
@@ -26,10 +26,13 @@ public class KafkaChatHome implements ChatHome
     int shard = selectShard(id);
     return chatRoomChannel
         .getChatRoom(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
+        .switchIfEmpty(Mono.error(() -> {
+          log.error("Unknown chat-room {} (shard={})!", id, shard);
+          return new UnknownChatroomException(
+              id,
+              shard,
+              chatRoomChannel.getOwnedShards());
+        }));
   }
 
   int selectShard(UUID chatRoomId)
index 8e7cecb227acb08f447a9dba6550d57e644ac052..bbe62a174cd9f5ce6f1662a85f76efdc1b35a9c1 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
+import java.time.Duration;
 import java.util.UUID;
 
 import static pl.rzrz.assertj.reactor.Assertions.assertThat;
@@ -30,7 +31,11 @@ public class ChatHomeTestBase
     // When
     Mono<ChatRoom> mono = chatHome
         .getChatRoom(chatRoomId)
-        .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+        .log()
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException))
+        .log();
 
     // Then
     assertThat(mono).emitsCount(1);
@@ -46,7 +51,11 @@ public class ChatHomeTestBase
     // When
     Mono<ChatRoom> mono = chatHome
         .getChatRoom(chatRoomId)
-        .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+        .log()
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException))
+        .log();
 
     // Then
     assertThat(mono).sendsError(e ->
index 7de73cad8fc1eda5bf61cbdb38d42e031d9f680e..7444570845cd48215e43dc0f5fc5ce732378a812 100644 (file)
@@ -5,6 +5,7 @@ import org.junit.jupiter.api.Test;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
+import java.time.Duration;
 import java.util.UUID;
 
 import static pl.rzrz.assertj.reactor.Assertions.assertThat;
@@ -27,7 +28,11 @@ public class ChatHomeWithShardsTestBase extends ChatHomeTestBase
     // When
     Mono<ChatRoom> mono = chatHome
         .getChatRoom(chatRoomId)
-        .retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof LoadInProgressException));
+        .log()
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException))
+        .log();
 
     // Then
     assertThat(mono).sendsError(e ->
index 96b0cb3b0f319e2432c2b8b780a6f8ab0d66400b..856b2e2441c33fc01dc461ca0915db857fc2fab8 100644 (file)
@@ -1,4 +1,4 @@
 logging:
   level:
     root: INFO
-    de.juplo.kafka.chat.backend: DEBUG
+    de.juplo.kafka.chat.backend: TRACE