DIRTYFIX:subscribe
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestWriter.java
index 9253ede..0d2c41e 100644 (file)
@@ -42,9 +42,9 @@ public class TestWriter
             return i++;
           }
         })
+        .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
         .map(i -> "Message #" + i)
         .flatMap(message -> sendMessage(chatRoom, message)
-            .delayElement(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
         .doOnNext(message ->
         {
@@ -63,8 +63,10 @@ public class TestWriter
               user,
               e.getResponseBodyAsString(Charset.defaultCharset()));
         })
+        .limitRate(1)
         .takeUntil(message -> !running)
-        .parallel()
+        .doOnComplete(() -> log.info("TestWriter {} is done", user))
+        .parallel(1)
         .runOn(Schedulers.parallel())
         .then();
   }
@@ -99,8 +101,8 @@ public class TestWriter
 
   private final WebClient webClient;
   private final ChatRoomInfoTo chatRoom;
-  private final User user;
 
+  final User user;
   final List<MessageTo> sentMessages = new LinkedList<>();
 
   volatile boolean running = true;