DIRTYFIX:subscribe
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestListener.java
index 3f282ee..f01e9b5 100644 (file)
@@ -23,12 +23,13 @@ public class TestListener
   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
 
 
-  public Mono<Void> run()
+  public Flux<MessageTo> run()
   {
     return Flux
         .fromArray(chatRooms)
         .flatMap(chatRoom ->
         {
+          log.info("Requesting messages from chat-room {}", chatRoom);
           List<MessageTo> list = new LinkedList<>();
           receivedMessages.put(chatRoom.getId(), list);
           return receiveMessages(chatRoom)
@@ -48,16 +49,12 @@ public class TestListener
                 list.add(message);
                 log.info(
                     "Received a message from chat-room {}: {}",
-                    chatRoom,
+                    chatRoom.getName(),
                     message);
-              })
-              .take(30);
+              });
         })
         .takeUntil(message -> !running)
-        .doOnComplete(() -> log.info("TestListener is done"))
-        .parallel()
-        .runOn(Schedulers.parallel())
-        .then();
+        .doOnComplete(() -> log.info("TestListener is done"));
   }
 
   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)