1 package de.juplo.kafka.chat.backend;
3 import lombok.extern.slf4j.Slf4j;
4 import org.awaitility.Awaitility;
5 import org.junit.jupiter.api.BeforeEach;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.web.reactive.function.client.WebClient;
8 import org.testcontainers.containers.*;
9 import org.testcontainers.containers.output.Slf4jLogConsumer;
10 import org.testcontainers.containers.wait.strategy.Wait;
11 import org.testcontainers.utility.DockerImageName;
12 import reactor.core.publisher.Mono;
14 import java.io.IOException;
15 import java.time.Duration;
19 class KafkaHandoverIT extends AbstractHandoverIT
22 void setUpWebClient() throws IOException, InterruptedException
27 Container.ExecResult result;
28 result = kafka.execInContainer(
38 "EXIT-CODE={}, STDOUT={}, STDERR={}",
42 result = kafka.execInContainer(
52 "EXIT-CODE={}, STDOUT={}, STDERR={}",
61 Integer port = haproxy.getMappedPort(8400);
62 webClient = WebClient.create("http://localhost:" + port);
66 .atMost(Duration.ofMinutes(10))
67 .until(() -> WebClient
68 .create("http://localhost:" + backend1.getMappedPort(8080))
70 .uri("/actuator/health")
71 .exchangeToMono(response ->
73 if (response.statusCode().equals(HttpStatus.OK))
76 .bodyToMono(StatusTo.class)
77 .map(StatusTo::getStatus)
78 .map(status -> status.equalsIgnoreCase("UP"));
82 return Mono.just(false);
89 .killContainerCmd(haproxy.getContainerId())
96 .atMost(Duration.ofMinutes(10))
97 .until(() -> webClient
99 .uri("/actuator/health")
100 .exchangeToMono(response ->
102 if (response.statusCode().equals(HttpStatus.OK))
105 .bodyToMono(StatusTo.class)
106 .map(StatusTo::getStatus)
107 .map(status -> status.equalsIgnoreCase("UP"));
111 return Mono.just(false);
118 Network network = Network.newNetwork();
120 KafkaContainer kafka =
121 new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
122 .withNetwork(network)
123 .withNetworkAliases("kafka")
124 .withListener(() -> "kafka:9999")
126 .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
127 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
129 GenericContainer backend1 =
130 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
131 .withImagePullPolicy(NEVER_PULL)
132 .withNetwork(network)
133 .withNetworkAliases("backend-1")
135 "--chat.backend.instance-id=backend-1",
136 "--chat.backend.services=kafka",
137 "--chat.backend.kafka.bootstrap-servers=kafka:9999",
138 "--chat.backend.kafka.instance-uri=http://backend-1:8080",
139 "--chat.backend.kafka.num-partitions=10",
140 "--chat.backend.kafka.client-id-prefix=B1",
141 "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
142 "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
144 .withExposedPorts(8080)
145 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
146 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
148 GenericContainer backend2 =
149 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
150 .withImagePullPolicy(NEVER_PULL)
151 .withNetwork(network)
152 .withNetworkAliases("backend-2")
154 "--chat.backend.instance-id=backend-2",
155 "--chat.backend.services=kafka",
156 "--chat.backend.kafka.bootstrap-servers=kafka:9999",
157 "--chat.backend.kafka.instance-uri=http://backend-2:8080",
158 "--chat.backend.kafka.num-partitions=10",
159 "--chat.backend.kafka.client-id-prefix=B2",
160 "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
161 "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
163 .withExposedPorts(8080)
164 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
165 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
167 GenericContainer backend3 =
168 new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
169 .withImagePullPolicy(NEVER_PULL)
170 .withNetwork(network)
171 .withNetworkAliases("backend-3")
173 "--chat.backend.instance-id=backend-3",
174 "--chat.backend.services=kafka",
175 "--chat.backend.kafka.bootstrap-servers=kafka:9999",
176 "--chat.backend.kafka.instance-uri=http://backend-3:8080",
177 "--chat.backend.kafka.num-partitions=10",
178 "--chat.backend.kafka.client-id-prefix=B3",
179 "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
180 "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
182 .withExposedPorts(8080)
183 .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
184 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
186 GenericContainer haproxy =
187 new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
188 .withNetwork(network)
189 .withNetworkAliases("haproxy")
190 .withClasspathResourceMapping(
192 "/usr/local/etc/haproxy/haproxy.cfg",
194 .withClasspathResourceMapping(
196 "/usr/local/etc/haproxy/sharding.map",
198 .withExposedPorts(8400, 8401, 8404)
199 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));