X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FTestListener.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FTestListener.java;h=e58cb2b605e7f085ce96d82a2b4100c9d5409c37;hb=295c4aeab33547e7fc2445da7b7e3c0ae2504920;hp=78d4c82f74b2b5b1bbd343491ecc8efcecdf138b;hpb=a08403419e17e14585f4ce7d126ac89096e40629;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java index 78d4c82f..e58cb2b6 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -28,7 +28,7 @@ public class TestListener .fromArray(chatRooms) .flatMap(chatRoom -> { - log.info("Requesting messages from chat-room {}", chatRoom); + log.info("Requesting messages for chat-room {}", chatRoom); List list = new LinkedList<>(); receivedMessages.put(chatRoom.getId(), list); return receiveMessages(chatRoom) @@ -43,7 +43,9 @@ public class TestListener return Mono.error(e); } }) - .doOnNext(message -> list.add(message)); + .doOnNext(message -> list.add(message)) + .doOnComplete(() -> log.info("{} was completed!", chatRoom)) + .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe)); }); }