1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
6 import lombok.extern.slf4j.Slf4j;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.http.MediaType;
9 import org.springframework.web.reactive.function.client.WebClient;
10 import org.springframework.web.reactive.function.client.WebClientResponseException;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
13 import reactor.core.scheduler.Schedulers;
14 import reactor.util.retry.Retry;
16 import java.nio.charset.Charset;
17 import java.time.Duration;
18 import java.util.Iterator;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ThreadLocalRandom;
25 public class TestWriter
27 public Mono<Void> run()
30 .fromIterable((Iterable<Integer>) () -> new Iterator<>()
35 public boolean hasNext()
46 .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
47 .map(i -> "Message #" + i)
48 .flatMap(message -> sendMessage(chatRoom, message)
49 .retryWhen(Retry.fixedDelay(30, Duration.ofSeconds(1))))
53 sentMessages.add(message);
55 "{} sent a message to {}: {}",
60 .doOnError(throwable ->
62 WebClientResponseException e = (WebClientResponseException)throwable.getCause();
64 "{} failed sending a message: {}",
66 e.getResponseBodyAsString(Charset.defaultCharset()));
69 .takeUntil(message -> !running)
70 .doOnComplete(() -> log.info("TestWriter {} is done", user))
72 .runOn(Schedulers.parallel())
76 private Mono<MessageTo> sendMessage(
77 ChatRoomInfoTo chatRoom,
83 "/{chatRoomId}/{username}/{serial}",
87 .contentType(MediaType.TEXT_PLAIN)
88 .accept(MediaType.APPLICATION_JSON)
90 .exchangeToMono(response ->
92 if (response.statusCode().equals(HttpStatus.OK))
94 return response.bodyToMono(MessageTo.class);
98 return response.createError();
104 private final WebClient webClient;
106 final ChatRoomInfoTo chatRoom;
108 final List<MessageTo> sentMessages = new LinkedList<>();
110 volatile boolean running = true;
112 private volatile int numSentMessages = 0;
115 TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
117 webClient = WebClient.create("http://localhost:" + port);
118 this.chatRoom = chatRoom;
119 user = new User(username);