From bce5ce999de007bb43c28f908fceef0646ac513c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 3 Mar 2024 10:20:11 +0100 Subject: [PATCH] DIRTYFIX:subscribe --- .../kafka/chat/backend/AbstractHandoverIT.java | 14 ++++---------- .../de/juplo/kafka/chat/backend/TestListener.java | 8 ++------ 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java index bbb2fbb5..b55a4c04 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java @@ -57,9 +57,11 @@ public abstract class AbstractHandoverIT } TestListener testListener = new TestListener(port, chatRooms); - CompletableFuture testListenerFuture = testListener + testListener .run() - .toFuture(); + .subscribe(message -> log.info( + "Received message: {}", + message)); log.info("Sleeping for 3 seconds..."); Thread.sleep(3000); @@ -70,14 +72,6 @@ public abstract class AbstractHandoverIT testWriterFutures[i].join(); log.info("Joined TestWriter {}", testWriters[i].user); } - - - log.info("Sleeping for 3 seconds..."); - Thread.sleep(3000); - log.info("Joining TestListener..."); - testListener.running = false; - testListenerFuture.join(); - log.info("Joined TestListener"); } Mono createChatRoom(String name) diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java index 35f65acd..f01e9b57 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -23,7 +23,7 @@ public class TestListener static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; - public Mono run() + public Flux run() { return Flux .fromArray(chatRooms) @@ -53,12 +53,8 @@ public class TestListener message); }); }) - .limitRate(10) .takeUntil(message -> !running) - .doOnComplete(() -> log.info("TestListener is done")) - .parallel(chatRooms.length) - .runOn(Schedulers.parallel()) - .then(); + .doOnComplete(() -> log.info("TestListener is done")); } Flux> receiveMessages(ChatRoomInfoTo chatRoom) -- 2.20.1