WIP:test: `*ConfigurationIT` asserts, if restored messages can be seen
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestWriter.java
index 8f7bc81..7d11c1d 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -42,12 +43,13 @@ public class TestWriter
             return i++;
           }
         })
+        .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
         .map(i -> "Message #" + i)
         .flatMap(message -> sendMessage(chatRoom, message)
-            .delayElement(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
-            .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
+            .retryWhen(Retry.fixedDelay(30, Duration.ofSeconds(1))))
         .doOnNext(message ->
         {
+          numSentMessages++;
           sentMessages.add(message);
           log.info(
               "{} sent a message to {}: {}",
@@ -63,6 +65,7 @@ public class TestWriter
               user,
               e.getResponseBodyAsString(Charset.defaultCharset()));
         })
+        .limitRate(1)
         .takeUntil(message -> !running)
         .doOnComplete(() -> log.info("TestWriter {} is done", user))
         .parallel(1)
@@ -99,12 +102,14 @@ public class TestWriter
 
 
   private final WebClient webClient;
-  private final ChatRoomInfoTo chatRoom;
 
+  final ChatRoomInfoTo chatRoom;
   final User user;
   final List<MessageTo> sentMessages = new LinkedList<>();
 
   volatile boolean running = true;
+  @Getter
+  private volatile int numSentMessages = 0;
 
 
   TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)