From: Kai Moritz Date: Fri, 15 Mar 2024 15:10:31 +0000 (+0100) Subject: test: HandoverIT-POC - FIX: `TestWriter` must use `Flux#concatMap()` X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1af73fed7db47bbc624c0004f140597a48149bed;p=demos%2Fkafka%2Fchat test: HandoverIT-POC - FIX: `TestWriter` must use `Flux#concatMap()` * `Flux#flatMap()` executes in parallel. Hence, the enqueued messages are _not_ send in the order, that they were enqueued. * Therefore, the `TestWriter` was refactored to use `Flux#concatMap()`, which executes serially. --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java b/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java index a6ac5696..aa3c6c10 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java @@ -44,8 +44,9 @@ public class TestWriter }) .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500))) .map(i -> "Message #" + i) - .flatMap(message -> sendMessage(chatRoom, message) - .retryWhen(Retry.fixedDelay(30, Duration.ofSeconds(1)))) + .concatMap(message -> sendMessage(chatRoom, message) + .log(user.getName()) + .retryWhen(Retry.fixedDelay(60, Duration.ofSeconds(1)))) .doOnNext(message -> { numSentMessages++;