X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FKafkaHandoverIT.java;h=7c511fd985e8d3859832aaa832c715de1695bae6;hb=70faa2e9ded3107b7a54984ea3b9ff09ec794cd2;hp=6bf1e9e2c144d357ef6625e557412734eba15dc1;hpb=a219b1ab7a0341b822ba379cadec1b11ba6028c9;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java index 6bf1e9e2..7c511fd9 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java @@ -1,92 +1,24 @@ package de.juplo.kafka.chat.backend; import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.client.WebClient; import org.testcontainers.containers.*; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; -import reactor.core.publisher.Mono; import java.io.IOException; -import java.time.Duration; @Slf4j class KafkaHandoverIT extends AbstractHandoverIT { - @BeforeEach - void setUpWebClient() + @Override + void setUpExtra() throws IOException, InterruptedException { - Integer port = HAPROXY.getMappedPort(8400); - webClient = WebClient.create("http://localhost:" + port); - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> WebClient - .create("http://localhost:" + BACKEND_1.getMappedPort(8080)) - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - - HAPROXY - .getDockerClient() - .killContainerCmd(HAPROXY.getContainerId()) - .withSignal("HUP") - .exec(); - - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> webClient - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - } - - - @BeforeAll - static void setUpDocker() throws IOException, InterruptedException - { - KAFKA.start(); - HAPROXY.start(); + kafka.start(); Container.ExecResult result; - result = KAFKA.execInContainer( + result = kafka.execInContainer( "kafka-topics", "--bootstrap-server", "kafka:9999", @@ -100,7 +32,7 @@ class KafkaHandoverIT extends AbstractHandoverIT result.getExitCode(), result.getStdout(), result.getStdout()); - result = KAFKA.execInContainer( + result = kafka.execInContainer( "kafka-topics", "--bootstrap-server", "kafka:9999", @@ -114,96 +46,63 @@ class KafkaHandoverIT extends AbstractHandoverIT result.getExitCode(), result.getStdout(), result.getStdout()); - - BACKEND_1.start(); - // BACKEND_2.start(); - // BACKEND_3.start(); } - static Network NETWORK = Network.newNetwork(); - static KafkaContainer KAFKA = + KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) - .withNetwork(NETWORK) + .withNetwork(network) .withNetworkAliases("kafka") .withListener(() -> "kafka:9999") .withKraft() .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); - static GenericContainer BACKEND_1 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(NETWORK) - .withNetworkAliases("backend-1") - .withCommand( - "--chat.backend.instance-id=backend-1", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-1:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B1", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" - ) - .withExposedPorts(8080) - .dependsOn(KAFKA) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); - - static GenericContainer BACKEND_2 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(NETWORK) - .withNetworkAliases("backend-2") - .withCommand( - "--chat.backend.instance-id=backend-2", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-2:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B2", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" - ) - .withExposedPorts(8080) - .dependsOn(KAFKA) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); - + @Override + String[] getCommandBackend1() + { + return new String[] + { + "--chat.backend.instance-id=backend-1", + "--chat.backend.services=kafka", + "--chat.backend.kafka.bootstrap-servers=kafka:9999", + "--chat.backend.kafka.instance-uri=http://backend-1:8080", + "--chat.backend.kafka.num-partitions=10", + "--chat.backend.kafka.client-id-prefix=B1", + "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", + "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" + }; + } - static GenericContainer BACKEND_3 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(NETWORK) - .withNetworkAliases("backend-3") - .withCommand( - "--chat.backend.instance-id=backend-3", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-3:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B3", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" - ) - .withExposedPorts(8080) - .dependsOn(KAFKA) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); + @Override + String[] getCommandBackend2() + { + return new String[] + { + "--chat.backend.instance-id=backend-2", + "--chat.backend.services=kafka", + "--chat.backend.kafka.bootstrap-servers=kafka:9999", + "--chat.backend.kafka.instance-uri=http://backend-2:8080", + "--chat.backend.kafka.num-partitions=10", + "--chat.backend.kafka.client-id-prefix=B2", + "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", + "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" + }; + } - static GenericContainer HAPROXY = - new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) - .withNetwork(NETWORK) - .withNetworkAliases("haproxy") - .withClasspathResourceMapping( - "haproxy.cfg", - "/usr/local/etc/haproxy/haproxy.cfg", - BindMode.READ_ONLY) - .withClasspathResourceMapping( - "sharding.map", - "/usr/local/etc/haproxy/sharding.map", - BindMode.READ_WRITE) - .withExposedPorts(8400, 8401, 8404) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); + @Override + String[] getCommandBackend3() + { + return new String[] + { + "--chat.backend.instance-id=backend-3", + "--chat.backend.services=kafka", + "--chat.backend.kafka.bootstrap-servers=kafka:9999", + "--chat.backend.kafka.instance-uri=http://backend-3:8080", + "--chat.backend.kafka.num-partitions=10", + "--chat.backend.kafka.client-id-prefix=B3", + "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", + "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" + }; + } }