1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.awaitility.Awaitility;
11 import org.junit.jupiter.api.BeforeAll;
12 import org.junit.jupiter.api.BeforeEach;
13 import org.junit.jupiter.api.Test;
14 import org.springframework.core.ParameterizedTypeReference;
15 import org.springframework.http.HttpStatus;
16 import org.springframework.http.MediaType;
17 import org.springframework.http.codec.ServerSentEvent;
18 import org.springframework.web.reactive.function.client.WebClient;
19 import org.testcontainers.containers.*;
20 import org.testcontainers.containers.output.Slf4jLogConsumer;
21 import org.testcontainers.containers.wait.strategy.Wait;
22 import org.testcontainers.utility.DockerImageName;
23 import reactor.core.publisher.Flux;
24 import reactor.core.publisher.Mono;
25 import reactor.util.retry.Retry;
27 import java.io.IOException;
28 import java.time.Duration;
29 import java.util.stream.IntStream;
33 class KafkaHandoverIT extends AbstractHandoverIT
36 void test() throws InterruptedException
38 ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
39 User user = new User("nerd");
42 .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
44 .map(MessageTo::toString)
45 .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
47 .forEach(result -> log.info("{}", result));
49 Long count = receiveMessages(chatRoom)
50 .doOnNext(message -> log.info("message: {}", message))
53 log.info("Received {} messages", count);
56 Mono<ChatRoomInfoTo> createChatRoom(String name)
61 .contentType(MediaType.TEXT_PLAIN)
63 .accept(MediaType.APPLICATION_JSON)
64 .exchangeToMono(response ->
66 if (response.statusCode().equals(HttpStatus.OK))
68 return response.bodyToMono(ChatRoomInfoTo.class);
72 return response.createError();
77 Mono<MessageTo> sendMessage(
78 ChatRoomInfoTo chatRoom,
85 "/{chatRoomId}/{username}/{serial}",
89 .contentType(MediaType.TEXT_PLAIN)
90 .accept(MediaType.APPLICATION_JSON)
92 .exchangeToMono(response ->
94 if (response.statusCode().equals(HttpStatus.OK))
96 return response.bodyToMono(MessageTo.class);
100 return response.createError();
105 Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
110 "/{chatRoomId}/listen",
112 .accept(MediaType.TEXT_EVENT_STREAM)
114 .bodyToFlux(sseType);
118 void setUpWebClient()
120 Integer port = HAPROXY.getMappedPort(8400);
121 webClient = WebClient.create("http://localhost:" + port);
125 .atMost(Duration.ofMinutes(10))
126 .until(() -> WebClient
127 .create("http://localhost:" + BACKEND_1.getMappedPort(8080))
129 .uri("/actuator/health")
130 .exchangeToMono(response ->
132 if (response.statusCode().equals(HttpStatus.OK))
135 .bodyToMono(StatusTo.class)
136 .map(StatusTo::getStatus)
137 .map(status -> status.equalsIgnoreCase("UP"));
141 return Mono.just(false);
148 .killContainerCmd(HAPROXY.getContainerId())
155 .atMost(Duration.ofMinutes(10))
156 .until(() -> webClient
158 .uri("/actuator/health")
159 .exchangeToMono(response ->
161 if (response.statusCode().equals(HttpStatus.OK))
164 .bodyToMono(StatusTo.class)
165 .map(StatusTo::getStatus)
166 .map(status -> status.equalsIgnoreCase("UP"));
170 return Mono.just(false);
180 static void setUpDocker() throws IOException, InterruptedException
185 Container.ExecResult result;
186 result = KAFKA.execInContainer(
188 "--bootstrap-server",
196 "EXIT-CODE={}, STDOUT={}, STDERR={}",
197 result.getExitCode(),
200 result = KAFKA.execInContainer(
202 "--bootstrap-server",
210 "EXIT-CODE={}, STDOUT={}, STDERR={}",
211 result.getExitCode(),
216 // BACKEND_2.start();
217 // BACKEND_3.start();
220 static Network NETWORK = Network.newNetwork();
222 static KafkaContainer KAFKA =
223 new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
224 .withNetwork(NETWORK)
225 .withNetworkAliases("kafka")
226 .withListener(() -> "kafka:9999")
228 .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
229 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
231 static GenericContainer BACKEND_1 =
232 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
233 .withImagePullPolicy(NEVER_PULL)
234 .withNetwork(NETWORK)
235 .withNetworkAliases("backend-1")
237 "--chat.backend.instance-id=backend-1",
238 "--chat.backend.services=kafka",
239 "--chat.backend.kafka.bootstrap-servers=kafka:9999",
240 "--chat.backend.kafka.instance-uri=http://backend-1:8080",
241 "--chat.backend.kafka.num-partitions=10",
242 "--chat.backend.kafka.client-id-prefix=B1",
243 "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
244 "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
246 .withExposedPorts(8080)
248 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
249 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
251 static GenericContainer BACKEND_2 =
252 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
253 .withImagePullPolicy(NEVER_PULL)
254 .withNetwork(NETWORK)
255 .withNetworkAliases("backend-2")
257 "--chat.backend.instance-id=backend-2",
258 "--chat.backend.services=kafka",
259 "--chat.backend.kafka.bootstrap-servers=kafka:9999",
260 "--chat.backend.kafka.instance-uri=http://backend-2:8080",
261 "--chat.backend.kafka.num-partitions=10",
262 "--chat.backend.kafka.client-id-prefix=B2",
263 "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
264 "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
266 .withExposedPorts(8080)
268 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
269 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
272 static GenericContainer BACKEND_3 =
273 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
274 .withImagePullPolicy(NEVER_PULL)
275 .withNetwork(NETWORK)
276 .withNetworkAliases("backend-3")
278 "--chat.backend.instance-id=backend-3",
279 "--chat.backend.services=kafka",
280 "--chat.backend.kafka.bootstrap-servers=kafka:9999",
281 "--chat.backend.kafka.instance-uri=http://backend-3:8080",
282 "--chat.backend.kafka.num-partitions=10",
283 "--chat.backend.kafka.client-id-prefix=B3",
284 "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
285 "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
287 .withExposedPorts(8080)
289 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
290 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
292 static GenericContainer HAPROXY =
293 new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
294 .withNetwork(NETWORK)
295 .withNetworkAliases("haproxy")
296 .withClasspathResourceMapping(
298 "/usr/local/etc/haproxy/haproxy.cfg",
300 .withClasspathResourceMapping(
302 "/usr/local/etc/haproxy/sharding.map",
304 .withExposedPorts(8400, 8401, 8404)
305 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
312 private final String name;
313 private int serial = 0;
330 static class StatusTo
335 ParameterizedTypeReference<ServerSentEvent<String>> sseType = new ParameterizedTypeReference<>() {};