X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FTestListener.java;h=1c993c17059cd6bcbfd0aa90ad37c24a3504656a;hb=09995381a4d5667cebd9d5dfdb915dcd48f169b5;hp=0ea86287de44a7cf40771d62a2d7c8dac072f45d;hpb=25fcd0e75de4fcadf400e2f1288d7d0c191be4fb;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java index 0ea86287..1c993c17 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -44,9 +44,14 @@ public class TestListener implements Runnable return Mono.error(e); } }) + .doOnNext(message -> log.info( + "Received a message from chat-room {}: {}", + chatRoom, + message)) .take(30)) .takeUntil(message -> !running) - .subscribe(message -> log.info("Received message: {}", message)); + .then() + .block(); } Flux> receiveMessages(ChatRoomInfoTo chatRoom)