DIRTYFIX:subscribe
authorKai Moritz <kai@juplo.de>
Sun, 3 Mar 2024 09:20:11 +0000 (10:20 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 3 Mar 2024 09:20:11 +0000 (10:20 +0100)
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
src/test/java/de/juplo/kafka/chat/backend/TestListener.java

index bbb2fbb..b55a4c0 100644 (file)
@@ -57,9 +57,11 @@ public abstract class AbstractHandoverIT
     }
 
     TestListener testListener = new TestListener(port, chatRooms);
-    CompletableFuture<Void> testListenerFuture = testListener
+    testListener
         .run()
-        .toFuture();
+        .subscribe(message -> log.info(
+            "Received message: {}",
+            message));
 
     log.info("Sleeping for 3 seconds...");
     Thread.sleep(3000);
@@ -70,14 +72,6 @@ public abstract class AbstractHandoverIT
       testWriterFutures[i].join();
       log.info("Joined TestWriter {}", testWriters[i].user);
     }
-
-
-    log.info("Sleeping for 3 seconds...");
-    Thread.sleep(3000);
-    log.info("Joining TestListener...");
-    testListener.running = false;
-    testListenerFuture.join();
-    log.info("Joined TestListener");
   }
 
   Mono<ChatRoomInfoTo> createChatRoom(String name)
index 35f65ac..f01e9b5 100644 (file)
@@ -23,7 +23,7 @@ public class TestListener
   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
 
 
-  public Mono<Void> run()
+  public Flux<MessageTo> run()
   {
     return Flux
         .fromArray(chatRooms)
@@ -53,12 +53,8 @@ public class TestListener
                     message);
               });
         })
-        .limitRate(10)
         .takeUntil(message -> !running)
-        .doOnComplete(() -> log.info("TestListener is done"))
-        .parallel(chatRooms.length)
-        .runOn(Schedulers.parallel())
-        .then();
+        .doOnComplete(() -> log.info("TestListener is done"));
   }
 
   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)