X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FTestClient.java;h=0d54600d17094e49bc966e143992bdd7017c5683;hb=319a01f87982b8cf00a0b77839b69d161f32d606;hp=4aad7f81dd09af1da1ec95ce9b4e34077e1e840e;hpb=3a55ee828e17f4a615e6362b0256f5c4d7894a0a;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java index 4aad7f81..0d54600d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java @@ -6,27 +6,49 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Flux; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; +import java.nio.charset.Charset; import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; @Slf4j -public class TestClient +public class TestClient implements Runnable { + @Override public void run() { - Flux - .range(1, 100) - .flatMap(i -> Flux - .fromArray(chatRooms) - .map(chatRoom -> sendMessage(chatRoom, "Message #" + i)) - .flatMap(result -> result - .map(MessageTo::toString) - .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))) - .subscribe(result -> log.info("{}", result)); + for (int i = 0; running; i++) + { + String message = "Message #" + i; + for (ChatRoomInfoTo chatRoom : chatRooms) + { + sendMessage(chatRoom, message) + .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) + .map(MessageTo::toString) + .onErrorResume(throwable -> + { + WebClientResponseException e = (WebClientResponseException)throwable.getCause(); + return Mono.just(e.getResponseBodyAsString(Charset.defaultCharset())); + }) + .subscribe(result -> log.info( + "{} sent a message to {}: {}", + user, + chatRoom, + result)); + } + try + { + Thread.sleep(ThreadLocalRandom.current().nextLong(700, 1000)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } } private Mono sendMessage( @@ -61,6 +83,8 @@ public class TestClient private final ChatRoomInfoTo[] chatRooms; private final User user; + volatile boolean running = true; + TestClient(Integer port, ChatRoomInfoTo[] chatRooms, String username) {