import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
class KafkaHandoverIT extends AbstractHandoverIT
{
@BeforeEach
- void setUpWebClient()
+ void setUpWebClient() throws IOException, InterruptedException
{
- Integer port = HAPROXY.getMappedPort(8400);
+ kafka.start();
+ haproxy.start();
+
+ Container.ExecResult result;
+ result = kafka.execInContainer(
+ "kafka-topics",
+ "--bootstrap-server",
+ "kafka:9999",
+ "--create",
+ "--topic",
+ "info_channel",
+ "--partitions",
+ "3");
+ log.info(
+ "EXIT-CODE={}, STDOUT={}, STDERR={}",
+ result.getExitCode(),
+ result.getStdout(),
+ result.getStdout());
+ result = kafka.execInContainer(
+ "kafka-topics",
+ "--bootstrap-server",
+ "kafka:9999",
+ "--create",
+ "--topic",
+ "data_channel",
+ "--partitions",
+ "10");
+ log.info(
+ "EXIT-CODE={}, STDOUT={}, STDERR={}",
+ result.getExitCode(),
+ result.getStdout(),
+ result.getStdout());
+
+ backend1.start();
+ // backend2.start();
+ // backend3.start();
+
+ Integer port = haproxy.getMappedPort(8400);
webClient = WebClient.create("http://localhost:" + port);
Awaitility
.await()
.atMost(Duration.ofMinutes(10))
.until(() -> WebClient
- .create("http://localhost:" + BACKEND_1.getMappedPort(8080))
+ .create("http://localhost:" + backend1.getMappedPort(8080))
.get()
.uri("/actuator/health")
.exchangeToMono(response ->
})
.block());
- HAPROXY
+ haproxy
.getDockerClient()
- .killContainerCmd(HAPROXY.getContainerId())
+ .killContainerCmd(haproxy.getContainerId())
.withSignal("HUP")
.exec();
}
- @BeforeAll
- static void setUpDocker() throws IOException, InterruptedException
- {
- KAFKA.start();
- HAPROXY.start();
+ Network network = Network.newNetwork();
- Container.ExecResult result;
- result = KAFKA.execInContainer(
- "kafka-topics",
- "--bootstrap-server",
- "kafka:9999",
- "--create",
- "--topic",
- "info_channel",
- "--partitions",
- "3");
- log.info(
- "EXIT-CODE={}, STDOUT={}, STDERR={}",
- result.getExitCode(),
- result.getStdout(),
- result.getStdout());
- result = KAFKA.execInContainer(
- "kafka-topics",
- "--bootstrap-server",
- "kafka:9999",
- "--create",
- "--topic",
- "data_channel",
- "--partitions",
- "10");
- log.info(
- "EXIT-CODE={}, STDOUT={}, STDERR={}",
- result.getExitCode(),
- result.getStdout(),
- result.getStdout());
-
- BACKEND_1.start();
- // BACKEND_2.start();
- // BACKEND_3.start();
- }
-
- static Network NETWORK = Network.newNetwork();
-
- static KafkaContainer KAFKA =
+ KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
- .withNetwork(NETWORK)
+ .withNetwork(network)
.withNetworkAliases("kafka")
.withListener(() -> "kafka:9999")
.withKraft()
.waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
- static GenericContainer BACKEND_1 =
+ GenericContainer backend1 =
new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
.withImagePullPolicy(NEVER_PULL)
- .withNetwork(NETWORK)
+ .withNetwork(network)
.withNetworkAliases("backend-1")
.withCommand(
"--chat.backend.instance-id=backend_1",
"--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
)
.withExposedPorts(8080)
- .dependsOn(KAFKA)
.waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
- static GenericContainer BACKEND_2 =
+ GenericContainer backend2 =
new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
.withImagePullPolicy(NEVER_PULL)
- .withNetwork(NETWORK)
+ .withNetwork(network)
.withNetworkAliases("backend-2")
.withCommand(
"--chat.backend.instance-id=backend_2",
"--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
)
.withExposedPorts(8080)
- .dependsOn(KAFKA)
.waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
-
- static GenericContainer BACKEND_3 =
+ GenericContainer backend3 =
new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
.withImagePullPolicy(NEVER_PULL)
- .withNetwork(NETWORK)
+ .withNetwork(network)
.withNetworkAliases("backend-3")
.withCommand(
"--chat.backend.instance-id=backend_3",
"--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
)
.withExposedPorts(8080)
- .dependsOn(KAFKA)
.waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
- static GenericContainer HAPROXY =
+ GenericContainer haproxy =
new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
- .withNetwork(NETWORK)
+ .withNetwork(network)
.withNetworkAliases("haproxy")
.withClasspathResourceMapping(
"haproxy.cfg",