WIP
authorKai Moritz <kai@juplo.de>
Sat, 2 Mar 2024 09:15:11 +0000 (10:15 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 2 Mar 2024 09:15:11 +0000 (10:15 +0100)
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
src/test/java/de/juplo/kafka/chat/backend/TestListener.java
src/test/java/de/juplo/kafka/chat/backend/TestWriter.java

index 87b1770..bbb2fbb 100644 (file)
@@ -61,16 +61,20 @@ public abstract class AbstractHandoverIT
         .run()
         .toFuture();
 
-    log.info("Sleepint for 3 seconds...");
+    log.info("Sleeping for 3 seconds...");
     Thread.sleep(3000);
 
     for (int i = 0; i < NUM_CLIENTS; i++)
     {
       testWriters[i].running = false;
       testWriterFutures[i].join();
-      log.info("Joined TestWriter {}", testWriters[i]);
+      log.info("Joined TestWriter {}", testWriters[i].user);
     }
 
+
+    log.info("Sleeping for 3 seconds...");
+    Thread.sleep(3000);
+    log.info("Joining TestListener...");
     testListener.running = false;
     testListenerFuture.join();
     log.info("Joined TestListener");
index 3f282ee..e413c52 100644 (file)
@@ -29,6 +29,7 @@ public class TestListener
         .fromArray(chatRooms)
         .flatMap(chatRoom ->
         {
+          log.info("Requesting messages from chat-room {}", chatRoom);
           List<MessageTo> list = new LinkedList<>();
           receivedMessages.put(chatRoom.getId(), list);
           return receiveMessages(chatRoom)
@@ -48,14 +49,15 @@ public class TestListener
                 list.add(message);
                 log.info(
                     "Received a message from chat-room {}: {}",
-                    chatRoom,
+                    chatRoom.getName(),
                     message);
               })
-              .take(30);
+              .take(10);
         })
+        .take(100)
         .takeUntil(message -> !running)
         .doOnComplete(() -> log.info("TestListener is done"))
-        .parallel()
+        .parallel(chatRooms.length)
         .runOn(Schedulers.parallel())
         .then();
   }
index ab3713c..8f7bc81 100644 (file)
@@ -65,7 +65,7 @@ public class TestWriter
         })
         .takeUntil(message -> !running)
         .doOnComplete(() -> log.info("TestWriter {} is done", user))
-        .parallel()
+        .parallel(1)
         .runOn(Schedulers.parallel())
         .then();
   }