From: Kai Moritz Date: Sun, 25 Feb 2024 08:46:57 +0000 (+0100) Subject: WIP:wait--FIX X-Git-Tag: rebase--2024-02-26--19-46~10 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=b2f5400a4d18b59f017db1fc0c827152c33056ae;p=demos%2Fkafka%2Fchat WIP:wait--FIX --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java index b84d540e..72f8436a 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java @@ -36,7 +36,11 @@ class KafkaHandoverIT extends AbstractHandoverIT User user = new User("nerd"); IntStream .rangeClosed(1,100) - .forEach(i ->sendMessage(chatRoom, user, "Message #" + i)); + .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) + .map(result -> result + .map(MessageTo::toString) + .block()) + .forEach(result -> log.info("{}", result)); Thread.sleep(10000); receiveMessage(chatRoom).subscribe(message -> log.info("message: {}", message)); @@ -101,7 +105,8 @@ class KafkaHandoverIT extends AbstractHandoverIT "/{chatRoomId}", chatRoom.getId()) .accept(MediaType.APPLICATION_OCTET_STREAM) - .retrieve().bodyToFlux(byte[].class); + .retrieve() + .bodyToFlux(byte[].class); } @BeforeEach