- ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
- User user = new User("nerd");
- IntStream
- .rangeClosed(1,100)
- .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
- .map(result -> result
- .map(MessageTo::toString)
- .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
- .block())
- .forEach(result -> log.info("{}", result));
-
- receiveMessages(chatRoom)
- .take(100)
- .doOnNext(message -> log.info("message: {}", message))
- .then()
- .block();
+ log.info("Starting backend-1...");
+ containers.startBackend(containers.backend1, new TestWriter[0]);
+ log.info("backend-1 started!");
+
+ ChatRoomInfoTo[] chatRooms = Flux
+ .range(0, NUM_CHATROOMS)
+ .flatMap(i -> createChatRoom("room-" + i))
+ .toStream()
+ .toArray(size -> new ChatRoomInfoTo[size]);
+
+ int port = containers.haproxy.getMappedPort(8400);
+
+ CompletableFuture<Void>[] testWriterFutures = new CompletableFuture[NUM_CLIENTS];
+ TestWriter[] testWriters = new TestWriter[NUM_CLIENTS];
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ TestWriter testWriter = new TestWriter(
+ port,
+ chatRooms[i % NUM_CHATROOMS],
+ "user-" + i);
+ testWriters[i] = testWriter;
+ testWriterFutures[i] = testWriter
+ .run()
+ .toFuture();
+ }
+
+ TestListener testListener = new TestListener(port, chatRooms);
+ testListener
+ .run()
+ .subscribe(message -> log.info(
+ "Received message: {}",
+ message));
+
+ log.info("Starting backend-2...");
+ containers.startBackend(containers.backend2, testWriters);
+ log.info("backend-2 started!");
+
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ testWriters[i].running = false;
+ testWriterFutures[i].join();
+ log.info("Joined TestWriter {}", testWriters[i].user);
+ }
+
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> assertAllSentMessagesReceived(testWriters, testListener));
+ }
+
+ private void assertAllSentMessagesReceived(
+ TestWriter[] testWriters,
+ TestListener testListener)
+ {
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ TestWriter testWriter = testWriters[i];
+ ChatRoomInfoTo chatRoom = testWriter.chatRoom;
+ List<MessageTo> receivedMessages = testListener.receivedMessages.get(chatRoom.getId());
+
+ Assertions.assertThat(receivedMessages
+ .stream()
+ .filter(message -> message.getUser().equals(testWriter.user.getName()))
+ ).containsExactlyElementsOf(testWriter.sentMessages);
+ }