test: HandoverIT-POC - Refactored `TestWriter` to use a `Flux`
authorKai Moritz <kai@juplo.de>
Fri, 1 Mar 2024 13:53:43 +0000 (14:53 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 1 Mar 2024 18:53:02 +0000 (19:53 +0100)
src/test/java/de/juplo/kafka/chat/backend/TestWriter.java

index 929d84f..08634b8 100644 (file)
@@ -7,11 +7,13 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
 import java.nio.charset.Charset;
 import java.time.Duration;
+import java.util.Iterator;
 import java.util.concurrent.ThreadLocalRandom;
 
 
@@ -21,32 +23,42 @@ public class TestWriter implements Runnable
   @Override
   public void run()
   {
-    for (int i = 0; running; i++)
-    {
-      String message = "Message #" + i;
-      try
-      {
-        sendMessage(chatRoom, message)
-            .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
-            .map(MessageTo::toString)
-            .onErrorResume(throwable ->
-            {
-              WebClientResponseException e = (WebClientResponseException)throwable.getCause();
-              return Mono.just(e.getResponseBodyAsString(Charset.defaultCharset()));
-            })
-            .subscribe(result -> log.info(
-                "{} sent a message to {}: {}",
-                user,
-                chatRoom,
-                result));
+    Flux
+        .fromIterable((Iterable<Integer>) () -> new Iterator<>()
+        {
+          private int i = 0;
+
+          @Override
+          public boolean hasNext()
+          {
+            return running;
+          }
 
-        Thread.sleep(ThreadLocalRandom.current().nextLong(700, 1000));
-      }
-      catch (Exception e)
-      {
-        throw new RuntimeException(e);
-      }
-    }
+          @Override
+          public Integer next()
+          {
+            return i++;
+          }
+        })
+        .map(i -> "Message #" + i)
+        .flatMap(message -> sendMessage(chatRoom, message)
+            .delayElement(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
+            .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
+        .doOnError(throwable ->
+        {
+          WebClientResponseException e = (WebClientResponseException)throwable.getCause();
+          log.error(
+              "{} failed sending a message: {}",
+              user,
+              e.getResponseBodyAsString(Charset.defaultCharset()));
+        })
+        .doOnNext(message -> log.info(
+            "{} sent a message to {}: {}",
+            user,
+            chatRoom,
+            message))
+        .then()
+        .block();
   }
 
   private Mono<MessageTo> sendMessage(