test: HandoverIT-POC - Splitted up code into smaller classes -- MOVE
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / StatusTo.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
6 import lombok.Getter;
7 import lombok.Setter;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.awaitility.Awaitility;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.core.ParameterizedTypeReference;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.codec.ServerSentEvent;
17 import org.springframework.web.reactive.function.client.WebClient;
18 import org.testcontainers.containers.BindMode;
19 import org.testcontainers.containers.GenericContainer;
20 import org.testcontainers.containers.Network;
21 import org.testcontainers.containers.output.Slf4jLogConsumer;
22 import org.testcontainers.containers.wait.strategy.Wait;
23 import org.testcontainers.images.ImagePullPolicy;
24 import org.testcontainers.junit.jupiter.Testcontainers;
25 import org.testcontainers.utility.DockerImageName;
26 import reactor.core.publisher.Flux;
27 import reactor.core.publisher.Mono;
28 import reactor.util.retry.Retry;
29
30 import java.io.IOException;
31 import java.time.Duration;
32 import java.util.stream.IntStream;
33
34
35 @Testcontainers
36 @Slf4j
37 public abstract class AbstractHandoverIT
38 {
39   static final ImagePullPolicy NEVER_PULL = imageName -> false;
40   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
41
42
43   @Test
44   void test() throws InterruptedException
45   {
46     ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
47     User user = new User("nerd");
48     IntStream
49         .rangeClosed(1,100)
50         .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
51         .map(result -> result
52             .map(MessageTo::toString)
53             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
54             .block())
55         .forEach(result -> log.info("{}", result));
56
57     receiveMessages(chatRoom)
58         .take(100)
59         .doOnNext(message -> log.info("message: {}", message))
60         .then()
61         .block();
62   }
63
64   Mono<ChatRoomInfoTo> createChatRoom(String name)
65   {
66     return webClient
67         .post()
68         .uri("/create")
69         .contentType(MediaType.TEXT_PLAIN)
70         .bodyValue(name)
71         .accept(MediaType.APPLICATION_JSON)
72         .exchangeToMono(response ->
73         {
74           if (response.statusCode().equals(HttpStatus.OK))
75           {
76             return response.bodyToMono(ChatRoomInfoTo.class);
77           }
78           else
79           {
80             return response.createError();
81           }
82         });
83   }
84
85   Mono<MessageTo> sendMessage(
86       ChatRoomInfoTo chatRoom,
87       User user,
88       String message)
89   {
90     return webClient
91         .put()
92         .uri(
93             "/{chatRoomId}/{username}/{serial}",
94             chatRoom.getId(),
95             user.getName(),
96             user.nextSerial())
97         .contentType(MediaType.TEXT_PLAIN)
98         .accept(MediaType.APPLICATION_JSON)
99         .bodyValue(message)
100         .exchangeToMono(response ->
101         {
102           if (response.statusCode().equals(HttpStatus.OK))
103           {
104             return response.bodyToMono(MessageTo.class);
105           }
106           else
107           {
108             return response.createError();
109           }
110         });
111   }
112
113   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
114   {
115     return webClient
116         .get()
117         .uri(
118             "/{chatRoomId}/listen",
119             chatRoom.getId())
120         .accept(MediaType.TEXT_EVENT_STREAM)
121         .retrieve()
122         .bodyToFlux(SSE_TYPE);
123   }
124
125
126   WebClient webClient;
127
128
129   abstract void setUpExtra() throws IOException, InterruptedException;
130
131   @BeforeEach
132   void setUp() throws Exception
133   {
134     setUpExtra();
135     haproxy.start();
136     backend1.start();
137     // backend2.start();
138     // backend3.start();
139
140     Integer port = haproxy.getMappedPort(8400);
141     webClient = WebClient.create("http://localhost:" + port);
142
143     Awaitility
144         .await()
145         .atMost(Duration.ofMinutes(10))
146         .until(() -> WebClient
147             .create("http://localhost:" + backend1.getMappedPort(8080))
148             .get()
149             .uri("/actuator/health")
150             .exchangeToMono(response ->
151             {
152               if (response.statusCode().equals(HttpStatus.OK))
153               {
154                 return response
155                     .bodyToMono(StatusTo.class)
156                     .map(StatusTo::getStatus)
157                     .map(status -> status.equalsIgnoreCase("UP"));
158               }
159               else
160               {
161                 return Mono.just(false);
162               }
163             })
164             .block());
165
166     haproxy
167         .getDockerClient()
168         .killContainerCmd(haproxy.getContainerId())
169         .withSignal("HUP")
170         .exec();
171
172
173     Awaitility
174         .await()
175         .atMost(Duration.ofMinutes(10))
176         .until(() -> webClient
177             .get()
178             .uri("/actuator/health")
179             .exchangeToMono(response ->
180             {
181               if (response.statusCode().equals(HttpStatus.OK))
182               {
183                 return response
184                     .bodyToMono(StatusTo.class)
185                     .map(StatusTo::getStatus)
186                     .map(status -> status.equalsIgnoreCase("UP"));
187               }
188               else
189               {
190                 return Mono.just(false);
191               }
192             })
193             .block());
194   }
195
196   Network network = Network.newNetwork();
197
198   GenericContainer haproxy =
199       new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
200           .withNetwork(network)
201           .withNetworkAliases("haproxy")
202           .withClasspathResourceMapping(
203               "haproxy.cfg",
204               "/usr/local/etc/haproxy/haproxy.cfg",
205               BindMode.READ_ONLY)
206           .withClasspathResourceMapping(
207               "sharding.map",
208               "/usr/local/etc/haproxy/sharding.map",
209               BindMode.READ_WRITE)
210           .withExposedPorts(8400, 8401, 8404)
211           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
212
213   abstract String[] getCommandBackend1();
214   GenericContainer backend1 =
215       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
216           .withImagePullPolicy(NEVER_PULL)
217           .withNetwork(network)
218           .withNetworkAliases("backend-1")
219           .withCommand(getCommandBackend1())
220           .withExposedPorts(8080)
221           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
222           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
223
224   abstract String[] getCommandBackend2();
225   GenericContainer backend2 =
226       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
227           .withImagePullPolicy(NEVER_PULL)
228           .withNetwork(network)
229           .withNetworkAliases("backend-2")
230           .withCommand(getCommandBackend2())
231           .withExposedPorts(8080)
232           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
233           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
234
235   abstract String[] getCommandBackend3();
236   GenericContainer backend3 =
237       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
238           .withImagePullPolicy(NEVER_PULL)
239           .withNetwork(network)
240           .withNetworkAliases("backend-3")
241           .withCommand(getCommandBackend3())
242           .withExposedPorts(8080)
243           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
244           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
245
246
247   @EqualsAndHashCode
248   @ToString
249   class User
250   {
251     @Getter
252     private final String name;
253     private int serial = 0;
254
255
256     User (String name)
257     {
258       this.name = name;
259     }
260
261
262     int nextSerial()
263     {
264       return ++serial;
265     }
266   }
267
268   @Getter
269   @Setter
270   static class StatusTo
271   {
272     String status;
273   }
274 }