test: HandoverIT-POC - Refactored `TestWriter` to use a `Flux`
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestWriter.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.extern.slf4j.Slf4j;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.reactive.function.client.WebClient;
9 import org.springframework.web.reactive.function.client.WebClientResponseException;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12 import reactor.util.retry.Retry;
13
14 import java.nio.charset.Charset;
15 import java.time.Duration;
16 import java.util.Iterator;
17 import java.util.concurrent.ThreadLocalRandom;
18
19
20 @Slf4j
21 public class TestWriter implements Runnable
22 {
23   @Override
24   public void run()
25   {
26     Flux
27         .fromIterable((Iterable<Integer>) () -> new Iterator<>()
28         {
29           private int i = 0;
30
31           @Override
32           public boolean hasNext()
33           {
34             return running;
35           }
36
37           @Override
38           public Integer next()
39           {
40             return i++;
41           }
42         })
43         .map(i -> "Message #" + i)
44         .flatMap(message -> sendMessage(chatRoom, message)
45             .delayElement(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
46             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
47         .doOnNext(message -> log.info(
48             "{} sent a message to {}: {}",
49             user,
50             chatRoom,
51             message))
52         .doOnError(throwable ->
53         {
54           WebClientResponseException e = (WebClientResponseException)throwable.getCause();
55           log.error(
56               "{} failed sending a message: {}",
57               user,
58               e.getResponseBodyAsString(Charset.defaultCharset()));
59         })
60         .then()
61         .block();
62   }
63
64   private Mono<MessageTo> sendMessage(
65       ChatRoomInfoTo chatRoom,
66       String message)
67   {
68     return webClient
69         .put()
70         .uri(
71             "/{chatRoomId}/{username}/{serial}",
72             chatRoom.getId(),
73             user.getName(),
74             user.nextSerial())
75         .contentType(MediaType.TEXT_PLAIN)
76         .accept(MediaType.APPLICATION_JSON)
77         .bodyValue(message)
78         .exchangeToMono(response ->
79         {
80           if (response.statusCode().equals(HttpStatus.OK))
81           {
82             return response.bodyToMono(MessageTo.class);
83           }
84           else
85           {
86             return response.createError();
87           }
88         });
89   }
90
91
92   private final WebClient webClient;
93   private final ChatRoomInfoTo chatRoom;
94   private final User user;
95
96   volatile boolean running = true;
97
98
99   TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
100   {
101     webClient = WebClient.create("http://localhost:" + port);
102     this.chatRoom = chatRoom;
103     user = new User(username);
104   }
105 }