.range(0, NUM_CLIENTS)
.map(i -> new TestWriter(
port,
- chatRooms,
+ chatRooms[i % NUM_CHATROOMS],
"user-" + Integer.toString(i)))
.doOnNext(testClient -> executorService.execute(testClient))
.toStream()
Flux
.fromArray(chatRooms)
- .flatMap(chatRoom ->receiveMessages(chatRoom).take(50))
+ .flatMap(chatRoom ->receiveMessages(chatRoom))
.doOnNext(message -> log.info("message: {}", message))
+ .take(50)
.then()
.block();
}
for (int i = 0; running; i++)
{
String message = "Message #" + i;
- for (ChatRoomInfoTo chatRoom : chatRooms)
+ try
{
sendMessage(chatRoom, message)
.retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
user,
chatRoom,
result));
- }
- try
- {
+
Thread.sleep(ThreadLocalRandom.current().nextLong(700, 1000));
}
catch (Exception e)
private final WebClient webClient;
- private final ChatRoomInfoTo[] chatRooms;
+ private final ChatRoomInfoTo chatRoom;
private final User user;
volatile boolean running = true;
- TestWriter(Integer port, ChatRoomInfoTo[] chatRooms, String username)
+ TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
{
webClient = WebClient.create("http://localhost:" + port);
- this.chatRooms = chatRooms;
+ this.chatRoom = chatRoom;
user = new User(username);
}
}