- TestClient testClient = new TestClient(
- containers.haproxy.getMappedPort(8400),
- chatRooms,
- "nerd");
- testClient.run();
-
- Flux
- .fromArray(chatRooms)
- .flatMap(chatRoom ->receiveMessages(chatRoom).take(100))
- .doOnNext(message -> log.info("message: {}", message))
- .then()
- .block();
+ int port = containers.haproxy.getMappedPort(8400);
+
+ CompletableFuture<Void>[] testWriterFutures = new CompletableFuture[NUM_CLIENTS];
+ TestWriter[] testWriters = new TestWriter[NUM_CLIENTS];
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ TestWriter testWriter = new TestWriter(
+ port,
+ chatRooms[i % NUM_CHATROOMS],
+ "user-" + i);
+ testWriters[i] = testWriter;
+ testWriterFutures[i] = testWriter
+ .run()
+ .toFuture();
+ }
+
+ TestListener testListener = new TestListener(port, chatRooms);
+ testListener
+ .run()
+ .subscribe(message -> log.info(
+ "Received message: {}",
+ message));
+
+ log.info("Sleeping for 3 seconds...");
+ Thread.sleep(3000);
+
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ testWriters[i].running = false;
+ testWriterFutures[i].join();
+ log.info("Joined TestWriter {}", testWriters[i].user);
+ }