WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestWriter.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.extern.slf4j.Slf4j;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.reactive.function.client.WebClient;
9 import org.springframework.web.reactive.function.client.WebClientResponseException;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12 import reactor.core.scheduler.Schedulers;
13 import reactor.util.retry.Retry;
14
15 import java.nio.charset.Charset;
16 import java.time.Duration;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.List;
20 import java.util.concurrent.ThreadLocalRandom;
21
22
23 @Slf4j
24 public class TestWriter
25 {
26   public Mono<Void> run()
27   {
28     return Flux
29         .fromIterable((Iterable<Integer>) () -> new Iterator<>()
30         {
31           private int i = 0;
32
33           @Override
34           public boolean hasNext()
35           {
36             return running;
37           }
38
39           @Override
40           public Integer next()
41           {
42             return i++;
43           }
44         })
45         .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
46         .map(i -> "Message #" + i)
47         .flatMap(message -> sendMessage(chatRoom, message)
48             .retryWhen(Retry.backoff(10, Duration.ofSeconds(1))))
49         .doOnNext(message ->
50         {
51           sentMessages.add(message);
52           log.info(
53               "{} sent a message to {}: {}",
54              user,
55              chatRoom,
56              message);
57         })
58         .doOnError(throwable ->
59         {
60           WebClientResponseException e = (WebClientResponseException)throwable.getCause();
61           log.error(
62               "{} failed sending a message: {}",
63               user,
64               e.getResponseBodyAsString(Charset.defaultCharset()));
65         })
66         .limitRate(1)
67         .takeUntil(message -> !running)
68         .doOnComplete(() -> log.info("TestWriter {} is done", user))
69         .parallel(1)
70         .runOn(Schedulers.parallel())
71         .then();
72   }
73
74   private Mono<MessageTo> sendMessage(
75       ChatRoomInfoTo chatRoom,
76       String message)
77   {
78     return webClient
79         .put()
80         .uri(
81             "/{chatRoomId}/{username}/{serial}",
82             chatRoom.getId(),
83             user.getName(),
84             user.nextSerial())
85         .contentType(MediaType.TEXT_PLAIN)
86         .accept(MediaType.APPLICATION_JSON)
87         .bodyValue(message)
88         .exchangeToMono(response ->
89         {
90           if (response.statusCode().equals(HttpStatus.OK))
91           {
92             return response.bodyToMono(MessageTo.class);
93           }
94           else
95           {
96             return response.createError();
97           }
98         });
99   }
100
101
102   private final WebClient webClient;
103
104   final ChatRoomInfoTo chatRoom;
105   final User user;
106   final List<MessageTo> sentMessages = new LinkedList<>();
107
108   volatile boolean running = true;
109
110
111   TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
112   {
113     webClient = WebClient.create("http://localhost:" + port);
114     this.chatRoom = chatRoom;
115     user = new User(username);
116   }
117 }