1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.awaitility.Awaitility;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.core.ParameterizedTypeReference;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.codec.ServerSentEvent;
17 import org.springframework.web.reactive.function.client.WebClient;
18 import org.testcontainers.containers.BindMode;
19 import org.testcontainers.containers.GenericContainer;
20 import org.testcontainers.containers.Network;
21 import org.testcontainers.containers.output.Slf4jLogConsumer;
22 import org.testcontainers.containers.wait.strategy.Wait;
23 import org.testcontainers.images.ImagePullPolicy;
24 import org.testcontainers.junit.jupiter.Testcontainers;
25 import org.testcontainers.utility.DockerImageName;
26 import reactor.core.publisher.Flux;
27 import reactor.core.publisher.Mono;
28 import reactor.util.retry.Retry;
30 import java.io.IOException;
31 import java.time.Duration;
32 import java.util.stream.IntStream;
37 public abstract class AbstractHandoverIT
39 static final ImagePullPolicy NEVER_PULL = imageName -> false;
40 static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
44 void test() throws InterruptedException
46 ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
47 User user = new User("nerd");
50 .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
52 .map(MessageTo::toString)
53 .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
55 .forEach(result -> log.info("{}", result));
57 receiveMessages(chatRoom)
59 .doOnNext(message -> log.info("message: {}", message))
64 Mono<ChatRoomInfoTo> createChatRoom(String name)
69 .contentType(MediaType.TEXT_PLAIN)
71 .accept(MediaType.APPLICATION_JSON)
72 .exchangeToMono(response ->
74 if (response.statusCode().equals(HttpStatus.OK))
76 return response.bodyToMono(ChatRoomInfoTo.class);
80 return response.createError();
85 Mono<MessageTo> sendMessage(
86 ChatRoomInfoTo chatRoom,
93 "/{chatRoomId}/{username}/{serial}",
97 .contentType(MediaType.TEXT_PLAIN)
98 .accept(MediaType.APPLICATION_JSON)
100 .exchangeToMono(response ->
102 if (response.statusCode().equals(HttpStatus.OK))
104 return response.bodyToMono(MessageTo.class);
108 return response.createError();
113 Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
118 "/{chatRoomId}/listen",
120 .accept(MediaType.TEXT_EVENT_STREAM)
122 .bodyToFlux(SSE_TYPE);
129 abstract void setUpExtra() throws IOException, InterruptedException;
132 void setUp() throws Exception
140 Integer port = haproxy.getMappedPort(8400);
141 webClient = WebClient.create("http://localhost:" + port);
145 .atMost(Duration.ofMinutes(10))
146 .until(() -> WebClient
147 .create("http://localhost:" + backend1.getMappedPort(8080))
149 .uri("/actuator/health")
150 .exchangeToMono(response ->
152 if (response.statusCode().equals(HttpStatus.OK))
155 .bodyToMono(StatusTo.class)
156 .map(StatusTo::getStatus)
157 .map(status -> status.equalsIgnoreCase("UP"));
161 return Mono.just(false);
168 .killContainerCmd(haproxy.getContainerId())
175 .atMost(Duration.ofMinutes(10))
176 .until(() -> webClient
178 .uri("/actuator/health")
179 .exchangeToMono(response ->
181 if (response.statusCode().equals(HttpStatus.OK))
184 .bodyToMono(StatusTo.class)
185 .map(StatusTo::getStatus)
186 .map(status -> status.equalsIgnoreCase("UP"));
190 return Mono.just(false);
196 Network network = Network.newNetwork();
198 GenericContainer haproxy =
199 new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
200 .withNetwork(network)
201 .withNetworkAliases("haproxy")
202 .withClasspathResourceMapping(
204 "/usr/local/etc/haproxy/haproxy.cfg",
206 .withClasspathResourceMapping(
208 "/usr/local/etc/haproxy/sharding.map",
210 .withExposedPorts(8400, 8401, 8404)
211 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
213 abstract String[] getCommandBackend1();
214 GenericContainer backend1 =
215 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
216 .withImagePullPolicy(NEVER_PULL)
217 .withNetwork(network)
218 .withNetworkAliases("backend-1")
219 .withCommand(getCommandBackend1())
220 .withExposedPorts(8080)
221 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
222 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
224 abstract String[] getCommandBackend2();
225 GenericContainer backend2 =
226 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
227 .withImagePullPolicy(NEVER_PULL)
228 .withNetwork(network)
229 .withNetworkAliases("backend-2")
230 .withCommand(getCommandBackend2())
231 .withExposedPorts(8080)
232 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
233 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
235 abstract String[] getCommandBackend3();
236 GenericContainer backend3 =
237 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
238 .withImagePullPolicy(NEVER_PULL)
239 .withNetwork(network)
240 .withNetworkAliases("backend-3")
241 .withCommand(getCommandBackend3())
242 .withExposedPorts(8080)
243 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
244 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
252 private final String name;
253 private int serial = 0;
270 static class StatusTo