test: HandoverIT-POC - Executing the Flux/Mono asynchrounusly
authorKai Moritz <kai@juplo.de>
Fri, 22 Mar 2024 15:10:20 +0000 (16:10 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 22 Mar 2024 15:48:31 +0000 (16:48 +0100)
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
src/test/java/de/juplo/kafka/chat/backend/TestListener.java
src/test/java/de/juplo/kafka/chat/backend/TestWriter.java

index ba1b5c5..53085f7 100644 (file)
@@ -4,18 +4,14 @@ import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.testcontainers.junit.jupiter.Testcontainers;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
 
 
 @Testcontainers
@@ -27,7 +23,6 @@ public abstract class AbstractHandoverIT
 
 
   private final AbstractHandoverITContainers containers;
-  private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS + 1);
 
 
   AbstractHandoverIT(AbstractHandoverITContainers containers)
@@ -47,25 +42,42 @@ public abstract class AbstractHandoverIT
 
     int port = containers.haproxy.getMappedPort(8400);
 
-    TestWriter[] testWriters = Flux
-        .range(0, NUM_CLIENTS)
-        .map(i -> new TestWriter(
-            port,
-            chatRooms[i % NUM_CHATROOMS],
-            "user-" + Integer.toString(i)))
-        .doOnNext(testClient -> executorService.execute(testClient))
-        .toStream()
-        .toArray(size -> new TestWriter[size]);
+    CompletableFuture<Void>[] testWriterFutures = new CompletableFuture[NUM_CLIENTS];
+    TestWriter[] testWriters = new TestWriter[NUM_CLIENTS];
+    for (int i = 0; i < NUM_CLIENTS; i++)
+    {
+      TestWriter testWriter = new TestWriter(
+          port,
+          chatRooms[i % NUM_CHATROOMS],
+          "user-" + i);
+      testWriters[i] = testWriter;
+      testWriterFutures[i] = testWriter
+          .run()
+          .toFuture();
+    }
 
     TestListener testListener = new TestListener(port, chatRooms);
-    executorService.execute(testListener);
+    CompletableFuture<Void> testListenerFuture = testListener
+        .run()
+        .toFuture();
 
+    log.info("Sleeping for 2 seconds...");
     Thread.sleep(2000);
 
-    Arrays
-        .stream(testWriters)
-        .forEach(testClient -> testClient.running = false);
+    for (int i = 0; i < NUM_CLIENTS; i++)
+    {
+      testWriters[i].running = false;
+      testWriterFutures[i].join();
+      log.info("Joined TestWriter {}", testWriters[i].user);
+    }
+
+
+    log.info("Sleeping for 2 seconds...");
+    Thread.sleep(2000);
+    log.info("Joining TestListener...");
     testListener.running = false;
+    testListenerFuture.join();
+    log.info("Joined TestListener");
   }
 
   Mono<ChatRoomInfoTo> createChatRoom(String name)
index 4a8ba0b..35f65ac 100644 (file)
@@ -12,23 +12,24 @@ import org.springframework.http.codec.ServerSentEvent;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 import java.util.*;
 
 
 @Slf4j
-public class TestListener implements Runnable
+public class TestListener
 {
   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
 
 
-  @Override
-  public void run()
+  public Mono<Void> run()
   {
-    Flux
+    return Flux
         .fromArray(chatRooms)
         .flatMap(chatRoom ->
         {
+          log.info("Requesting messages from chat-room {}", chatRoom);
           List<MessageTo> list = new LinkedList<>();
           receivedMessages.put(chatRoom.getId(), list);
           return receiveMessages(chatRoom)
@@ -48,14 +49,16 @@ public class TestListener implements Runnable
                 list.add(message);
                 log.info(
                     "Received a message from chat-room {}: {}",
-                    chatRoom,
+                    chatRoom.getName(),
                     message);
-              })
-              .limitRate(10);
+              });
         })
+        .limitRate(10)
         .takeUntil(message -> !running)
-        .then()
-        .block();
+        .doOnComplete(() -> log.info("TestListener is done"))
+        .parallel(chatRooms.length)
+        .runOn(Schedulers.parallel())
+        .then();
   }
 
   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
index 559cd36..0d2c41e 100644 (file)
@@ -9,6 +9,7 @@ import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.retry.Retry;
 
 import java.nio.charset.Charset;
@@ -20,12 +21,11 @@ import java.util.concurrent.ThreadLocalRandom;
 
 
 @Slf4j
-public class TestWriter implements Runnable
+public class TestWriter
 {
-  @Override
-  public void run()
+  public Mono<Void> run()
   {
-    Flux
+    return Flux
         .fromIterable((Iterable<Integer>) () -> new Iterator<>()
         {
           private int i = 0;
@@ -42,9 +42,9 @@ public class TestWriter implements Runnable
             return i++;
           }
         })
+        .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
         .map(i -> "Message #" + i)
         .flatMap(message -> sendMessage(chatRoom, message)
-            .delayElement(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
         .doOnNext(message ->
         {
@@ -63,8 +63,12 @@ public class TestWriter implements Runnable
               user,
               e.getResponseBodyAsString(Charset.defaultCharset()));
         })
-        .then()
-        .block();
+        .limitRate(1)
+        .takeUntil(message -> !running)
+        .doOnComplete(() -> log.info("TestWriter {} is done", user))
+        .parallel(1)
+        .runOn(Schedulers.parallel())
+        .then();
   }
 
   private Mono<MessageTo> sendMessage(
@@ -97,8 +101,8 @@ public class TestWriter implements Runnable
 
   private final WebClient webClient;
   private final ChatRoomInfoTo chatRoom;
-  private final User user;
 
+  final User user;
   final List<MessageTo> sentMessages = new LinkedList<>();
 
   volatile boolean running = true;