1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.extern.slf4j.Slf4j;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.reactive.function.client.WebClient;
9 import org.springframework.web.reactive.function.client.WebClientResponseException;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12 import reactor.util.retry.Retry;
14 import java.nio.charset.Charset;
15 import java.time.Duration;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.List;
19 import java.util.concurrent.ThreadLocalRandom;
23 public class TestWriter implements Runnable
29 .fromIterable((Iterable<Integer>) () -> new Iterator<>()
34 public boolean hasNext()
45 .map(i -> "Message #" + i)
46 .flatMap(message -> sendMessage(chatRoom, message)
47 .delayElement(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
48 .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
51 sentMessages.add(message);
53 "{} sent a message to {}: {}",
58 .doOnError(throwable ->
60 WebClientResponseException e = (WebClientResponseException)throwable.getCause();
62 "{} failed sending a message: {}",
64 e.getResponseBodyAsString(Charset.defaultCharset()));
70 private Mono<MessageTo> sendMessage(
71 ChatRoomInfoTo chatRoom,
77 "/{chatRoomId}/{username}/{serial}",
81 .contentType(MediaType.TEXT_PLAIN)
82 .accept(MediaType.APPLICATION_JSON)
84 .exchangeToMono(response ->
86 if (response.statusCode().equals(HttpStatus.OK))
88 return response.bodyToMono(MessageTo.class);
92 return response.createError();
98 private final WebClient webClient;
99 private final ChatRoomInfoTo chatRoom;
100 private final User user;
102 final List<MessageTo> sentMessages = new LinkedList<>();
104 volatile boolean running = true;
107 TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
109 webClient = WebClient.create("http://localhost:" + port);
110 this.chatRoom = chatRoom;
111 user = new User(username);