1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.awaitility.Awaitility;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.core.ParameterizedTypeReference;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.codec.ServerSentEvent;
17 import org.springframework.web.reactive.function.client.WebClient;
18 import org.testcontainers.containers.BindMode;
19 import org.testcontainers.containers.GenericContainer;
20 import org.testcontainers.containers.Network;
21 import org.testcontainers.containers.output.Slf4jLogConsumer;
22 import org.testcontainers.containers.wait.strategy.Wait;
23 import org.testcontainers.images.ImagePullPolicy;
24 import org.testcontainers.junit.jupiter.Testcontainers;
25 import org.testcontainers.utility.DockerImageName;
26 import reactor.core.publisher.Flux;
27 import reactor.core.publisher.Mono;
28 import reactor.util.retry.Retry;
30 import java.io.IOException;
31 import java.time.Duration;
32 import java.util.stream.IntStream;
37 public abstract class AbstractHandoverIT
40 void test() throws InterruptedException
42 ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
43 User user = new User("nerd");
46 .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
48 .map(MessageTo::toString)
49 .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
51 .forEach(result -> log.info("{}", result));
53 receiveMessages(chatRoom)
55 .doOnNext(message -> log.info("message: {}", message))
61 abstract void setUpExtra() throws IOException, InterruptedException;
64 void setUp() throws Exception
74 .atMost(Duration.ofMinutes(10))
75 .until(() -> WebClient
76 .create("http://localhost:" + backend1.getMappedPort(8080))
78 .uri("/actuator/health")
79 .exchangeToMono(response ->
81 if (response.statusCode().equals(HttpStatus.OK))
84 .bodyToMono(StatusTo.class)
85 .map(StatusTo::getStatus)
86 .map(status -> status.equalsIgnoreCase("UP"));
90 return Mono.just(false);
97 .killContainerCmd(haproxy.getContainerId())
104 .atMost(Duration.ofMinutes(10))
105 .until(() -> WebClient
106 .create("http://localhost:" + haproxy.getMappedPort(8400))
108 .uri("/actuator/health")
109 .exchangeToMono(response ->
111 if (response.statusCode().equals(HttpStatus.OK))
114 .bodyToMono(StatusTo.class)
115 .map(StatusTo::getStatus)
116 .map(status -> status.equalsIgnoreCase("UP"));
120 return Mono.just(false);
126 GenericContainer haproxy;
128 abstract String[] getCommandBackend1();
129 GenericContainer backend1;
130 abstract String[] getCommandBackend2();
131 GenericContainer backend2;
133 abstract String[] getCommandBackend3();
134 GenericContainer backend3 =
135 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
136 .withImagePullPolicy(NEVER_PULL)
137 .withNetwork(network)
138 .withNetworkAliases("backend-3")
139 .withCommand(getCommandBackend3())
140 .withExposedPorts(8080)
141 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
142 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
150 private final String name;
151 private int serial = 0;
168 static class StatusTo