1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.awaitility.Awaitility;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.core.ParameterizedTypeReference;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.codec.ServerSentEvent;
17 import org.springframework.web.reactive.function.client.WebClient;
18 import org.testcontainers.containers.BindMode;
19 import org.testcontainers.containers.GenericContainer;
20 import org.testcontainers.containers.Network;
21 import org.testcontainers.containers.output.Slf4jLogConsumer;
22 import org.testcontainers.containers.wait.strategy.Wait;
23 import org.testcontainers.images.ImagePullPolicy;
24 import org.testcontainers.junit.jupiter.Testcontainers;
25 import org.testcontainers.utility.DockerImageName;
26 import reactor.core.publisher.Flux;
27 import reactor.core.publisher.Mono;
28 import reactor.util.retry.Retry;
30 import java.io.IOException;
31 import java.time.Duration;
32 import java.util.stream.IntStream;
37 public abstract class AbstractHandoverIT
39 static final ImagePullPolicy NEVER_PULL = imageName -> false;
43 void test() throws InterruptedException
45 ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
46 User user = new User("nerd");
49 .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
51 .map(MessageTo::toString)
52 .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
54 .forEach(result -> log.info("{}", result));
56 receiveMessages(chatRoom)
58 .doOnNext(message -> log.info("message: {}", message))
64 abstract void setUpExtra() throws IOException, InterruptedException;
67 void setUp() throws Exception
77 .atMost(Duration.ofMinutes(10))
78 .until(() -> WebClient
79 .create("http://localhost:" + backend1.getMappedPort(8080))
81 .uri("/actuator/health")
82 .exchangeToMono(response ->
84 if (response.statusCode().equals(HttpStatus.OK))
87 .bodyToMono(StatusTo.class)
88 .map(StatusTo::getStatus)
89 .map(status -> status.equalsIgnoreCase("UP"));
93 return Mono.just(false);
100 .killContainerCmd(haproxy.getContainerId())
107 .atMost(Duration.ofMinutes(10))
108 .until(() -> WebClient
109 .create("http://localhost:" + haproxy.getMappedPort(8400))
111 .uri("/actuator/health")
112 .exchangeToMono(response ->
114 if (response.statusCode().equals(HttpStatus.OK))
117 .bodyToMono(StatusTo.class)
118 .map(StatusTo::getStatus)
119 .map(status -> status.equalsIgnoreCase("UP"));
123 return Mono.just(false);
129 Network network = Network.newNetwork();
131 GenericContainer haproxy =
132 new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
133 .withNetwork(network)
134 .withNetworkAliases("haproxy")
135 .withClasspathResourceMapping(
137 "/usr/local/etc/haproxy/haproxy.cfg",
139 .withClasspathResourceMapping(
141 "/usr/local/etc/haproxy/sharding.map",
143 .withExposedPorts(8400, 8401, 8404)
144 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
146 abstract String[] getCommandBackend1();
147 GenericContainer backend1 =
148 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
149 .withImagePullPolicy(NEVER_PULL)
150 .withNetwork(network)
151 .withNetworkAliases("backend-1")
152 .withCommand(getCommandBackend1())
153 .withExposedPorts(8080)
154 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
155 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
157 abstract String[] getCommandBackend2();
158 GenericContainer backend2 =
159 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
160 .withImagePullPolicy(NEVER_PULL)
161 .withNetwork(network)
162 .withNetworkAliases("backend-2")
163 .withCommand(getCommandBackend2())
164 .withExposedPorts(8080)
165 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
166 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
168 abstract String[] getCommandBackend3();
169 GenericContainer backend3 =
170 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
171 .withImagePullPolicy(NEVER_PULL)
172 .withNetwork(network)
173 .withNetworkAliases("backend-3")
174 .withCommand(getCommandBackend3())
175 .withExposedPorts(8080)
176 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
177 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
185 private final String name;
186 private int serial = 0;
203 static class StatusTo