test: HandoverIT-POC - fix for the blocking wait for `TestListener`
authorKai Moritz <kai@juplo.de>
Sun, 3 Mar 2024 09:20:11 +0000 (10:20 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 14 Mar 2024 08:11:21 +0000 (09:11 +0100)
* Droped the waiting for `TestListener` alltogehter.
* The waiting can be droped, because waiting for the `TestWriter`-instances
  ensures, that all messages are send (and therefore very likely received)

src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
src/test/java/de/juplo/kafka/chat/backend/TestListener.java

index bbb2fbb..dff9b56 100644 (file)
@@ -57,9 +57,11 @@ public abstract class AbstractHandoverIT
     }
 
     TestListener testListener = new TestListener(port, chatRooms);
-    CompletableFuture<Void> testListenerFuture = testListener
+    testListener
         .run()
-        .toFuture();
+        .subscribe(message -> log.info(
+            "Received message: {}",
+            message));
 
     log.info("Sleeping for 3 seconds...");
     Thread.sleep(3000);
@@ -71,13 +73,8 @@ public abstract class AbstractHandoverIT
       log.info("Joined TestWriter {}", testWriters[i].user);
     }
 
-
-    log.info("Sleeping for 3 seconds...");
-    Thread.sleep(3000);
-    log.info("Joining TestListener...");
-    testListener.running = false;
-    testListenerFuture.join();
-    log.info("Joined TestListener");
+    // Yield the work, so that the last messages can be received
+    Thread.sleep(500);
   }
 
   Mono<ChatRoomInfoTo> createChatRoom(String name)
index 35f65ac..78d4c82 100644 (file)
@@ -12,7 +12,6 @@ import org.springframework.http.codec.ServerSentEvent;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 
 import java.util.*;
 
@@ -23,7 +22,7 @@ public class TestListener
   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
 
 
-  public Mono<Void> run()
+  public Flux<MessageTo> run()
   {
     return Flux
         .fromArray(chatRooms)
@@ -44,21 +43,8 @@ public class TestListener
                   return Mono.error(e);
                 }
               })
-              .doOnNext(message ->
-              {
-                list.add(message);
-                log.info(
-                    "Received a message from chat-room {}: {}",
-                    chatRoom.getName(),
-                    message);
-              });
-        })
-        .limitRate(10)
-        .takeUntil(message -> !running)
-        .doOnComplete(() -> log.info("TestListener is done"))
-        .parallel(chatRooms.length)
-        .runOn(Schedulers.parallel())
-        .then();
+              .doOnNext(message -> list.add(message));
+        });
   }
 
   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
@@ -80,8 +66,6 @@ public class TestListener
 
   final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
 
-  volatile boolean running = true;
-
 
   TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
   {