From: Kai Moritz Date: Fri, 1 Mar 2024 12:46:54 +0000 (+0100) Subject: test: HandoverIT-POC - Renamed `TestClient` to `TestWriter` -- MOVE X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=242cd133691ba99ea3b0e9afb6edae59f6bb00d5;p=demos%2Fkafka%2Fchat test: HandoverIT-POC - Renamed `TestClient` to `TestWriter` -- MOVE --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java deleted file mode 100644 index 0d54600d..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java +++ /dev/null @@ -1,95 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClientResponseException; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -import java.nio.charset.Charset; -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; - - -@Slf4j -public class TestClient implements Runnable -{ - @Override - public void run() - { - for (int i = 0; running; i++) - { - String message = "Message #" + i; - for (ChatRoomInfoTo chatRoom : chatRooms) - { - sendMessage(chatRoom, message) - .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) - .map(MessageTo::toString) - .onErrorResume(throwable -> - { - WebClientResponseException e = (WebClientResponseException)throwable.getCause(); - return Mono.just(e.getResponseBodyAsString(Charset.defaultCharset())); - }) - .subscribe(result -> log.info( - "{} sent a message to {}: {}", - user, - chatRoom, - result)); - } - try - { - Thread.sleep(ThreadLocalRandom.current().nextLong(700, 1000)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - } - - private Mono sendMessage( - ChatRoomInfoTo chatRoom, - String message) - { - return webClient - .put() - .uri( - "/{chatRoomId}/{username}/{serial}", - chatRoom.getId(), - user.getName(), - user.nextSerial()) - .contentType(MediaType.TEXT_PLAIN) - .accept(MediaType.APPLICATION_JSON) - .bodyValue(message) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(MessageTo.class); - } - else - { - return response.createError(); - } - }); - } - - - private final WebClient webClient; - private final ChatRoomInfoTo[] chatRooms; - private final User user; - - volatile boolean running = true; - - - TestClient(Integer port, ChatRoomInfoTo[] chatRooms, String username) - { - webClient = WebClient.create("http://localhost:" + port); - this.chatRooms = chatRooms; - user = new User(username); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java b/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java new file mode 100644 index 00000000..0d54600d --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java @@ -0,0 +1,95 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.nio.charset.Charset; +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + + +@Slf4j +public class TestClient implements Runnable +{ + @Override + public void run() + { + for (int i = 0; running; i++) + { + String message = "Message #" + i; + for (ChatRoomInfoTo chatRoom : chatRooms) + { + sendMessage(chatRoom, message) + .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) + .map(MessageTo::toString) + .onErrorResume(throwable -> + { + WebClientResponseException e = (WebClientResponseException)throwable.getCause(); + return Mono.just(e.getResponseBodyAsString(Charset.defaultCharset())); + }) + .subscribe(result -> log.info( + "{} sent a message to {}: {}", + user, + chatRoom, + result)); + } + try + { + Thread.sleep(ThreadLocalRandom.current().nextLong(700, 1000)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + + private Mono sendMessage( + ChatRoomInfoTo chatRoom, + String message) + { + return webClient + .put() + .uri( + "/{chatRoomId}/{username}/{serial}", + chatRoom.getId(), + user.getName(), + user.nextSerial()) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .bodyValue(message) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(MessageTo.class); + } + else + { + return response.createError(); + } + }); + } + + + private final WebClient webClient; + private final ChatRoomInfoTo[] chatRooms; + private final User user; + + volatile boolean running = true; + + + TestClient(Integer port, ChatRoomInfoTo[] chatRooms, String username) + { + webClient = WebClient.create("http://localhost:" + port); + this.chatRooms = chatRooms; + user = new User(username); + } +}