WIP:test: `*ConfigurationIT` asserts, if restored messages can be seen
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestWriter.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.Getter;
6 import lombok.extern.slf4j.Slf4j;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.http.MediaType;
9 import org.springframework.web.reactive.function.client.WebClient;
10 import org.springframework.web.reactive.function.client.WebClientResponseException;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
13 import reactor.core.scheduler.Schedulers;
14 import reactor.util.retry.Retry;
15
16 import java.nio.charset.Charset;
17 import java.time.Duration;
18 import java.util.Iterator;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ThreadLocalRandom;
22
23
24 @Slf4j
25 public class TestWriter
26 {
27   public Mono<Void> run()
28   {
29     return Flux
30         .fromIterable((Iterable<Integer>) () -> new Iterator<>()
31         {
32           private int i = 0;
33
34           @Override
35           public boolean hasNext()
36           {
37             return running;
38           }
39
40           @Override
41           public Integer next()
42           {
43             return i++;
44           }
45         })
46         .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
47         .map(i -> "Message #" + i)
48         .flatMap(message -> sendMessage(chatRoom, message)
49             .retryWhen(Retry.fixedDelay(30, Duration.ofSeconds(1))))
50         .doOnNext(message ->
51         {
52           numSentMessages++;
53           sentMessages.add(message);
54           log.info(
55               "{} sent a message to {}: {}",
56              user,
57              chatRoom,
58              message);
59         })
60         .doOnError(throwable ->
61         {
62           WebClientResponseException e = (WebClientResponseException)throwable.getCause();
63           log.error(
64               "{} failed sending a message: {}",
65               user,
66               e.getResponseBodyAsString(Charset.defaultCharset()));
67         })
68         .limitRate(1)
69         .takeUntil(message -> !running)
70         .doOnComplete(() -> log.info("TestWriter {} is done", user))
71         .parallel(1)
72         .runOn(Schedulers.parallel())
73         .then();
74   }
75
76   private Mono<MessageTo> sendMessage(
77       ChatRoomInfoTo chatRoom,
78       String message)
79   {
80     return webClient
81         .put()
82         .uri(
83             "/{chatRoomId}/{username}/{serial}",
84             chatRoom.getId(),
85             user.getName(),
86             user.nextSerial())
87         .contentType(MediaType.TEXT_PLAIN)
88         .accept(MediaType.APPLICATION_JSON)
89         .bodyValue(message)
90         .exchangeToMono(response ->
91         {
92           if (response.statusCode().equals(HttpStatus.OK))
93           {
94             return response.bodyToMono(MessageTo.class);
95           }
96           else
97           {
98             return response.createError();
99           }
100         });
101   }
102
103
104   private final WebClient webClient;
105
106   final ChatRoomInfoTo chatRoom;
107   final User user;
108   final List<MessageTo> sentMessages = new LinkedList<>();
109
110   volatile boolean running = true;
111   @Getter
112   private volatile int numSentMessages = 0;
113
114
115   TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
116   {
117     webClient = WebClient.create("http://localhost:" + port);
118     this.chatRoom = chatRoom;
119     user = new User(username);
120   }
121 }