FIX:take_vs_limitRate
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestListener.java
index 3f282ee..35f65ac 100644 (file)
@@ -29,6 +29,7 @@ public class TestListener
         .fromArray(chatRooms)
         .flatMap(chatRoom ->
         {
+          log.info("Requesting messages from chat-room {}", chatRoom);
           List<MessageTo> list = new LinkedList<>();
           receivedMessages.put(chatRoom.getId(), list);
           return receiveMessages(chatRoom)
@@ -48,14 +49,14 @@ public class TestListener
                 list.add(message);
                 log.info(
                     "Received a message from chat-room {}: {}",
-                    chatRoom,
+                    chatRoom.getName(),
                     message);
-              })
-              .take(30);
+              });
         })
+        .limitRate(10)
         .takeUntil(message -> !running)
         .doOnComplete(() -> log.info("TestListener is done"))
-        .parallel()
+        .parallel(chatRooms.length)
         .runOn(Schedulers.parallel())
         .then();
   }