test: HandoverIT-POC - Splitted up code into smaller classes -- MOVE
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / AbstractHandoverITContainers.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
6 import lombok.Getter;
7 import lombok.Setter;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.awaitility.Awaitility;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.core.ParameterizedTypeReference;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.codec.ServerSentEvent;
17 import org.springframework.web.reactive.function.client.WebClient;
18 import org.testcontainers.containers.BindMode;
19 import org.testcontainers.containers.GenericContainer;
20 import org.testcontainers.containers.Network;
21 import org.testcontainers.containers.output.Slf4jLogConsumer;
22 import org.testcontainers.containers.wait.strategy.Wait;
23 import org.testcontainers.images.ImagePullPolicy;
24 import org.testcontainers.junit.jupiter.Testcontainers;
25 import org.testcontainers.utility.DockerImageName;
26 import reactor.core.publisher.Flux;
27 import reactor.core.publisher.Mono;
28 import reactor.util.retry.Retry;
29
30 import java.io.IOException;
31 import java.time.Duration;
32 import java.util.stream.IntStream;
33
34
35 @Testcontainers
36 @Slf4j
37 public abstract class AbstractHandoverIT
38 {
39   static final ImagePullPolicy NEVER_PULL = imageName -> false;
40
41
42   @Test
43   void test() throws InterruptedException
44   {
45     ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
46     User user = new User("nerd");
47     IntStream
48         .rangeClosed(1,100)
49         .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
50         .map(result -> result
51             .map(MessageTo::toString)
52             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
53             .block())
54         .forEach(result -> log.info("{}", result));
55
56     receiveMessages(chatRoom)
57         .take(100)
58         .doOnNext(message -> log.info("message: {}", message))
59         .then()
60         .block();
61   }
62
63
64   abstract void setUpExtra() throws IOException, InterruptedException;
65
66   @BeforeEach
67   void setUp() throws Exception
68   {
69     setUpExtra();
70     haproxy.start();
71     backend1.start();
72     // backend2.start();
73     // backend3.start();
74
75     Awaitility
76         .await()
77         .atMost(Duration.ofMinutes(10))
78         .until(() -> WebClient
79             .create("http://localhost:" + backend1.getMappedPort(8080))
80             .get()
81             .uri("/actuator/health")
82             .exchangeToMono(response ->
83             {
84               if (response.statusCode().equals(HttpStatus.OK))
85               {
86                 return response
87                     .bodyToMono(StatusTo.class)
88                     .map(StatusTo::getStatus)
89                     .map(status -> status.equalsIgnoreCase("UP"));
90               }
91               else
92               {
93                 return Mono.just(false);
94               }
95             })
96             .block());
97
98     haproxy
99         .getDockerClient()
100         .killContainerCmd(haproxy.getContainerId())
101         .withSignal("HUP")
102         .exec();
103
104
105     Awaitility
106         .await()
107         .atMost(Duration.ofMinutes(10))
108         .until(() -> WebClient
109             .create("http://localhost:" + haproxy.getMappedPort(8400))
110             .get()
111             .uri("/actuator/health")
112             .exchangeToMono(response ->
113             {
114               if (response.statusCode().equals(HttpStatus.OK))
115               {
116                 return response
117                     .bodyToMono(StatusTo.class)
118                     .map(StatusTo::getStatus)
119                     .map(status -> status.equalsIgnoreCase("UP"));
120               }
121               else
122               {
123                 return Mono.just(false);
124               }
125             })
126             .block());
127   }
128
129   Network network = Network.newNetwork();
130
131   GenericContainer haproxy =
132       new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
133           .withNetwork(network)
134           .withNetworkAliases("haproxy")
135           .withClasspathResourceMapping(
136               "haproxy.cfg",
137               "/usr/local/etc/haproxy/haproxy.cfg",
138               BindMode.READ_ONLY)
139           .withClasspathResourceMapping(
140               "sharding.map",
141               "/usr/local/etc/haproxy/sharding.map",
142               BindMode.READ_WRITE)
143           .withExposedPorts(8400, 8401, 8404)
144           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
145
146   abstract String[] getCommandBackend1();
147   GenericContainer backend1 =
148       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
149           .withImagePullPolicy(NEVER_PULL)
150           .withNetwork(network)
151           .withNetworkAliases("backend-1")
152           .withCommand(getCommandBackend1())
153           .withExposedPorts(8080)
154           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
155           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
156
157   abstract String[] getCommandBackend2();
158   GenericContainer backend2 =
159       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
160           .withImagePullPolicy(NEVER_PULL)
161           .withNetwork(network)
162           .withNetworkAliases("backend-2")
163           .withCommand(getCommandBackend2())
164           .withExposedPorts(8080)
165           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
166           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
167
168   abstract String[] getCommandBackend3();
169   GenericContainer backend3 =
170       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
171           .withImagePullPolicy(NEVER_PULL)
172           .withNetwork(network)
173           .withNetworkAliases("backend-3")
174           .withCommand(getCommandBackend3())
175           .withExposedPorts(8080)
176           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
177           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
178
179
180   @EqualsAndHashCode
181   @ToString
182   class User
183   {
184     @Getter
185     private final String name;
186     private int serial = 0;
187
188
189     User (String name)
190     {
191       this.name = name;
192     }
193
194
195     int nextSerial()
196     {
197       return ++serial;
198     }
199   }
200
201   @Getter
202   @Setter
203   static class StatusTo
204   {
205     String status;
206   }
207 }