WIP:test: HandoverIT-POC - splitted up code into smaller classes -- ALIGN
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / HandoverIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
6 import lombok.Getter;
7 import lombok.Setter;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.awaitility.Awaitility;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.core.ParameterizedTypeReference;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.codec.ServerSentEvent;
17 import org.springframework.web.reactive.function.client.WebClient;
18 import org.testcontainers.containers.BindMode;
19 import org.testcontainers.containers.GenericContainer;
20 import org.testcontainers.containers.Network;
21 import org.testcontainers.containers.output.Slf4jLogConsumer;
22 import org.testcontainers.containers.wait.strategy.Wait;
23 import org.testcontainers.images.ImagePullPolicy;
24 import org.testcontainers.junit.jupiter.Testcontainers;
25 import org.testcontainers.utility.DockerImageName;
26 import reactor.core.publisher.Flux;
27 import reactor.core.publisher.Mono;
28 import reactor.util.retry.Retry;
29
30 import java.io.IOException;
31 import java.time.Duration;
32 import java.util.stream.IntStream;
33
34
35 @Testcontainers
36 @Slf4j
37 public abstract class AbstractHandoverIT
38 {
39   @Test
40   void test() throws InterruptedException
41   {
42     ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
43     User user = new User("nerd");
44     IntStream
45         .rangeClosed(1,100)
46         .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
47         .map(result -> result
48             .map(MessageTo::toString)
49             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
50             .block())
51         .forEach(result -> log.info("{}", result));
52
53     receiveMessages(chatRoom)
54         .take(100)
55         .doOnNext(message -> log.info("message: {}", message))
56         .then()
57         .block();
58   }
59
60
61   abstract void setUpExtra() throws IOException, InterruptedException;
62
63   @BeforeEach
64   void setUp() throws Exception
65   {
66     setUpExtra();
67     haproxy.start();
68     backend1.start();
69     // backend2.start();
70     // backend3.start();
71
72     Awaitility
73         .await()
74         .atMost(Duration.ofMinutes(10))
75         .until(() -> WebClient
76             .create("http://localhost:" + backend1.getMappedPort(8080))
77             .get()
78             .uri("/actuator/health")
79             .exchangeToMono(response ->
80             {
81               if (response.statusCode().equals(HttpStatus.OK))
82               {
83                 return response
84                     .bodyToMono(StatusTo.class)
85                     .map(StatusTo::getStatus)
86                     .map(status -> status.equalsIgnoreCase("UP"));
87               }
88               else
89               {
90                 return Mono.just(false);
91               }
92             })
93             .block());
94
95     haproxy
96         .getDockerClient()
97         .killContainerCmd(haproxy.getContainerId())
98         .withSignal("HUP")
99         .exec();
100
101
102     Awaitility
103         .await()
104         .atMost(Duration.ofMinutes(10))
105         .until(() -> WebClient
106             .create("http://localhost:" + haproxy.getMappedPort(8400))
107             .get()
108             .uri("/actuator/health")
109             .exchangeToMono(response ->
110             {
111               if (response.statusCode().equals(HttpStatus.OK))
112               {
113                 return response
114                     .bodyToMono(StatusTo.class)
115                     .map(StatusTo::getStatus)
116                     .map(status -> status.equalsIgnoreCase("UP"));
117               }
118               else
119               {
120                 return Mono.just(false);
121               }
122             })
123             .block());
124   }
125
126   GenericContainer haproxy;
127
128   abstract String[] getCommandBackend1();
129   GenericContainer backend1;
130   abstract String[] getCommandBackend2();
131   GenericContainer backend2;
132
133   abstract String[] getCommandBackend3();
134   GenericContainer backend3 =
135       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
136           .withImagePullPolicy(NEVER_PULL)
137           .withNetwork(network)
138           .withNetworkAliases("backend-3")
139           .withCommand(getCommandBackend3())
140           .withExposedPorts(8080)
141           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
142           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
143
144
145   @EqualsAndHashCode
146   @ToString
147   class User
148   {
149     @Getter
150     private final String name;
151     private int serial = 0;
152
153
154     User (String name)
155     {
156       this.name = name;
157     }
158
159
160     int nextSerial()
161     {
162       return ++serial;
163     }
164   }
165
166   @Getter
167   @Setter
168   static class StatusTo
169   {
170     String status;
171   }
172 }