From dd10508813ee6bc2f1658d30f52777d405da653b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 27 Feb 2024 21:56:45 +0100 Subject: [PATCH] test: HandoverIT-POC - messages are written to 23 chat-rooms instead of 1 --- .../chat/backend/AbstractHandoverIT.java | 15 ++++++++--- .../juplo/kafka/chat/backend/TestClient.java | 25 ++++++++++--------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java index 2e4faa1b..900a7c39 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono; public abstract class AbstractHandoverIT { static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; + static final int NUM_CHATROOMS = 23; private final AbstractHandoverITContainers containers; @@ -33,15 +34,21 @@ public abstract class AbstractHandoverIT @Test void test() throws InterruptedException { - ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); + ChatRoomInfoTo[] chatRooms = Flux + .range(0, NUM_CHATROOMS) + .flatMap(i -> createChatRoom("#" + i)) + .toStream() + .toArray(size -> new ChatRoomInfoTo[size]); + TestClient testClient = new TestClient( containers.haproxy.getMappedPort(8400), - chatRoom, + chatRooms, "nerd"); testClient.run(); - receiveMessages(chatRoom) - .take(100) + Flux + .fromArray(chatRooms) + .flatMap(chatRoom ->receiveMessages(chatRoom).take(100)) .doOnNext(message -> log.info("message: {}", message)) .then() .block(); diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java index 5dde9981..4aad7f81 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java @@ -6,11 +6,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import java.time.Duration; -import java.util.stream.IntStream; @Slf4j @@ -18,14 +18,15 @@ public class TestClient { public void run() { - IntStream - .rangeClosed(1,100) - .mapToObj(i ->sendMessage(chatRoom, "Message #" + i)) - .map(result -> result - .map(MessageTo::toString) - .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) - .block()) - .forEach(result -> log.info("{}", result)); + Flux + .range(1, 100) + .flatMap(i -> Flux + .fromArray(chatRooms) + .map(chatRoom -> sendMessage(chatRoom, "Message #" + i)) + .flatMap(result -> result + .map(MessageTo::toString) + .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))) + .subscribe(result -> log.info("{}", result)); } private Mono sendMessage( @@ -57,14 +58,14 @@ public class TestClient private final WebClient webClient; - private final ChatRoomInfoTo chatRoom; + private final ChatRoomInfoTo[] chatRooms; private final User user; - TestClient(Integer port, ChatRoomInfoTo chatRoom, String username) + TestClient(Integer port, ChatRoomInfoTo[] chatRooms, String username) { webClient = WebClient.create("http://localhost:" + port); - this.chatRoom = chatRoom; + this.chatRooms = chatRooms; user = new User(username); } } -- 2.20.1