DIRTYFIX:subscribe
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestWriter.java
index 08634b8..0d2c41e 100644 (file)
@@ -9,21 +9,23 @@ import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.retry.Retry;
 
 import java.nio.charset.Charset;
 import java.time.Duration;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 
 @Slf4j
-public class TestWriter implements Runnable
+public class TestWriter
 {
-  @Override
-  public void run()
+  public Mono<Void> run()
   {
-    Flux
+    return Flux
         .fromIterable((Iterable<Integer>) () -> new Iterator<>()
         {
           private int i = 0;
@@ -40,10 +42,19 @@ public class TestWriter implements Runnable
             return i++;
           }
         })
+        .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
         .map(i -> "Message #" + i)
         .flatMap(message -> sendMessage(chatRoom, message)
-            .delayElement(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
+        .doOnNext(message ->
+        {
+          sentMessages.add(message);
+          log.info(
+              "{} sent a message to {}: {}",
+             user,
+             chatRoom,
+             message);
+        })
         .doOnError(throwable ->
         {
           WebClientResponseException e = (WebClientResponseException)throwable.getCause();
@@ -52,13 +63,12 @@ public class TestWriter implements Runnable
               user,
               e.getResponseBodyAsString(Charset.defaultCharset()));
         })
-        .doOnNext(message -> log.info(
-            "{} sent a message to {}: {}",
-            user,
-            chatRoom,
-            message))
-        .then()
-        .block();
+        .limitRate(1)
+        .takeUntil(message -> !running)
+        .doOnComplete(() -> log.info("TestWriter {} is done", user))
+        .parallel(1)
+        .runOn(Schedulers.parallel())
+        .then();
   }
 
   private Mono<MessageTo> sendMessage(
@@ -91,7 +101,9 @@ public class TestWriter implements Runnable
 
   private final WebClient webClient;
   private final ChatRoomInfoTo chatRoom;
-  private final User user;
+
+  final User user;
+  final List<MessageTo> sentMessages = new LinkedList<>();
 
   volatile boolean running = true;