X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FTestClient.java;h=0d54600d17094e49bc966e143992bdd7017c5683;hb=319a01f87982b8cf00a0b77839b69d161f32d606;hp=5dde9981d8eaa631a4e634ccb5218d544203d535;hpb=99b7a91a74e9c0e064e2592c3364a527a32a2460;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java index 5dde9981..0d54600d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java @@ -6,26 +6,49 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; +import java.nio.charset.Charset; import java.time.Duration; -import java.util.stream.IntStream; +import java.util.concurrent.ThreadLocalRandom; @Slf4j -public class TestClient +public class TestClient implements Runnable { + @Override public void run() { - IntStream - .rangeClosed(1,100) - .mapToObj(i ->sendMessage(chatRoom, "Message #" + i)) - .map(result -> result - .map(MessageTo::toString) + for (int i = 0; running; i++) + { + String message = "Message #" + i; + for (ChatRoomInfoTo chatRoom : chatRooms) + { + sendMessage(chatRoom, message) .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) - .block()) - .forEach(result -> log.info("{}", result)); + .map(MessageTo::toString) + .onErrorResume(throwable -> + { + WebClientResponseException e = (WebClientResponseException)throwable.getCause(); + return Mono.just(e.getResponseBodyAsString(Charset.defaultCharset())); + }) + .subscribe(result -> log.info( + "{} sent a message to {}: {}", + user, + chatRoom, + result)); + } + try + { + Thread.sleep(ThreadLocalRandom.current().nextLong(700, 1000)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } } private Mono sendMessage( @@ -57,14 +80,16 @@ public class TestClient private final WebClient webClient; - private final ChatRoomInfoTo chatRoom; + private final ChatRoomInfoTo[] chatRooms; private final User user; + volatile boolean running = true; + - TestClient(Integer port, ChatRoomInfoTo chatRoom, String username) + TestClient(Integer port, ChatRoomInfoTo[] chatRooms, String username) { webClient = WebClient.create("http://localhost:" + port); - this.chatRoom = chatRoom; + this.chatRooms = chatRooms; user = new User(username); } }