From: Kai Moritz Date: Sat, 24 Feb 2024 11:50:34 +0000 (+0100) Subject: WIP:setup X-Git-Tag: rebase--2024-02-26--19-46~19 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=dcef51760a52354121910e1bd8d31affd2e3a3c3;p=demos%2Fkafka%2Fchat WIP:setup --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java index 6256d706..7b338035 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java @@ -6,23 +6,20 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.Network; +import org.testcontainers.containers.*; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.images.builder.Transferable; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.utility.DockerImageName; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.io.IOException; import java.util.stream.IntStream; @@ -103,43 +100,72 @@ class KafkaHandoverIT extends AbstractHandoverIT } @BeforeEach - void setUp() + void setUpWebClient() { - Integer port = haproxy.getMappedPort(8400); + Integer port = HAPROXY.getMappedPort(8400); webClient = WebClient.create("http://localhost:" + port); } - WebClient webClient; - Network network = Network.newNetwork(); - @Container - KafkaContainer kafka = + @BeforeAll + static void setUpDocker() throws IOException, InterruptedException + { + KAFKA.start(); + + Container.ExecResult result; + result = KAFKA.execInContainer( + "kafka-topics", + "--bootstrap-server", + "kafka:9999", + "--create", + "--topic", + "info_channel", + "--partitions", + "3"); + log.info( + "EXIT-CODE={}, STDOUT={}, STDERR={}", + result.getExitCode(), + result.getStdout(), + result.getStdout()); + result = KAFKA.execInContainer( + "kafka-topics", + "--bootstrap-server", + "kafka:9999", + "--create", + "--topic", + "data_channel", + "--partitions", + "10"); + log.info( + "EXIT-CODE={}, STDOUT={}, STDERR={}", + result.getExitCode(), + result.getStdout(), + result.getStdout()); + + BACKEND_1.start(); + BACKEND_2.start(); + BACKEND_3.start(); + + HAPROXY.start(); + } + + static Network NETWORK = Network.newNetwork(); + + static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) - .withNetwork(network) + .withNetwork(NETWORK) .withNetworkAliases("kafka") .withListener(() -> "kafka:9999") .withKraft() .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); - @Container - GenericContainer setup = - new GenericContainer<>("confluentinc/cp-kcat:7.4.0") - .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("sh", "-c")) - .withNetwork(network) - .withCommand( - "kafka-topics --bootstrap-server kafka:9999 --create --topic info_channel --partitions 3; " + - "kafka-topics --bootstrap-server kafka:9999 --create --topic data_channel --partitions 10") - .dependsOn(kafka) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("SETUP")); - - @Container - GenericContainer backend_1 = + static GenericContainer BACKEND_1 = new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) + .withNetwork(NETWORK) .withNetworkAliases("backend-1") .withCommand( "--chat.backend.instance-id=backend-1", @@ -151,15 +177,14 @@ class KafkaHandoverIT extends AbstractHandoverIT "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" ) - .dependsOn(setup) + .dependsOn(KAFKA) .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); - @Container - GenericContainer backend_2 = + static GenericContainer BACKEND_2 = new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) + .withNetwork(NETWORK) .withNetworkAliases("backend-2") .withCommand( "--chat.backend.instance-id=backend-2", @@ -171,15 +196,15 @@ class KafkaHandoverIT extends AbstractHandoverIT "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" ) - .dependsOn(setup) + .dependsOn(KAFKA) .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); - @Container - GenericContainer backend_3 = + + static GenericContainer BACKEND_3 = new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) + .withNetwork(NETWORK) .withNetworkAliases("backend-3") .withCommand( "--chat.backend.instance-id=backend-3", @@ -191,14 +216,13 @@ class KafkaHandoverIT extends AbstractHandoverIT "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" ) - .dependsOn(setup) + .dependsOn(KAFKA) .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); - @Container - GenericContainer haproxy = + static GenericContainer HAPROXY = new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) - .withNetwork(network) + .withNetwork(NETWORK) .withNetworkAliases("haproxy") .withClasspathResourceMapping( "haproxy.cfg", @@ -209,7 +233,7 @@ class KafkaHandoverIT extends AbstractHandoverIT "/usr/local/etc/haproxy/sharding.map", BindMode.READ_WRITE) .withExposedPorts(8400) // , 8401, 8404, 5555) - .dependsOn(backend_1, backend_2, backend_3) + .dependsOn(BACKEND_1, BACKEND_2, BACKEND_3) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); @EqualsAndHashCode