From 9b24747d04dd6d595030c1f9d1bf92e3020f1c73 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 27 Feb 2024 09:39:56 +0100 Subject: [PATCH] test: HandoverIT-POC - splitted up code into smaller classes -- ALIGN --- .../chat/backend/AbstractHandoverIT.java | 212 ++------------ .../backend/AbstractHandoverITContainers.java | 157 +++------- .../kafka/chat/backend/KafkaHandoverIT.java | 99 +------ .../backend/KafkaHandoverITContainers.java | 46 +-- .../de/juplo/kafka/chat/backend/StatusTo.java | 269 +----------------- .../juplo/kafka/chat/backend/TestClient.java | 224 +-------------- .../de/juplo/kafka/chat/backend/User.java | 267 +---------------- 7 files changed, 96 insertions(+), 1178 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java index f384f7e8..2e4faa1b 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java @@ -1,13 +1,7 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.core.ParameterizedTypeReference; @@ -15,44 +9,36 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.images.ImagePullPolicy; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -import java.io.IOException; -import java.time.Duration; -import java.util.stream.IntStream; @Testcontainers @Slf4j public abstract class AbstractHandoverIT { - static final ImagePullPolicy NEVER_PULL = imageName -> false; static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; + private final AbstractHandoverITContainers containers; + + + AbstractHandoverIT(AbstractHandoverITContainers containers) + { + this.containers = containers; + } + + @Test void test() throws InterruptedException { ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); - User user = new User("nerd"); - IntStream - .rangeClosed(1,100) - .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) - .map(result -> result - .map(MessageTo::toString) - .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) - .block()) - .forEach(result -> log.info("{}", result)); + TestClient testClient = new TestClient( + containers.haproxy.getMappedPort(8400), + chatRoom, + "nerd"); + testClient.run(); receiveMessages(chatRoom) .take(100) @@ -82,34 +68,6 @@ public abstract class AbstractHandoverIT }); } - Mono sendMessage( - ChatRoomInfoTo chatRoom, - User user, - String message) - { - return webClient - .put() - .uri( - "/{chatRoomId}/{username}/{serial}", - chatRoom.getId(), - user.getName(), - user.nextSerial()) - .contentType(MediaType.TEXT_PLAIN) - .accept(MediaType.APPLICATION_JSON) - .bodyValue(message) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(MessageTo.class); - } - else - { - return response.createError(); - } - }); - } - Flux> receiveMessages(ChatRoomInfoTo chatRoom) { return webClient @@ -125,150 +83,12 @@ public abstract class AbstractHandoverIT WebClient webClient; - - abstract void setUpExtra() throws IOException, InterruptedException; - @BeforeEach void setUp() throws Exception { - setUpExtra(); - haproxy.start(); - backend1.start(); - // backend2.start(); - // backend3.start(); + containers.setUp(); - Integer port = haproxy.getMappedPort(8400); + Integer port = containers.haproxy.getMappedPort(8400); webClient = WebClient.create("http://localhost:" + port); - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> WebClient - .create("http://localhost:" + backend1.getMappedPort(8080)) - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - - haproxy - .getDockerClient() - .killContainerCmd(haproxy.getContainerId()) - .withSignal("HUP") - .exec(); - - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> webClient - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - } - - Network network = Network.newNetwork(); - - GenericContainer haproxy = - new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) - .withNetwork(network) - .withNetworkAliases("haproxy") - .withClasspathResourceMapping( - "haproxy.cfg", - "/usr/local/etc/haproxy/haproxy.cfg", - BindMode.READ_ONLY) - .withClasspathResourceMapping( - "sharding.map", - "/usr/local/etc/haproxy/sharding.map", - BindMode.READ_WRITE) - .withExposedPorts(8400, 8401, 8404) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); - - abstract String[] getCommandBackend1(); - GenericContainer backend1 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-1") - .withCommand(getCommandBackend1()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); - - abstract String[] getCommandBackend2(); - GenericContainer backend2 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-2") - .withCommand(getCommandBackend2()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); - - abstract String[] getCommandBackend3(); - GenericContainer backend3 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-3") - .withCommand(getCommandBackend3()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); - - - @EqualsAndHashCode - @ToString - class User - { - @Getter - private final String name; - private int serial = 0; - - - User (String name) - { - this.name = name; - } - - - int nextSerial() - { - return ++serial; - } - } - - @Getter - @Setter - static class StatusTo - { - String status; } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java index b22166fa..4e19defd 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java @@ -1,19 +1,8 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; @@ -21,49 +10,37 @@ import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.ImagePullPolicy; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; -import java.io.IOException; import java.time.Duration; -import java.util.stream.IntStream; +import java.util.Arrays; -@Testcontainers @Slf4j -public abstract class AbstractHandoverIT +public abstract class AbstractHandoverITContainers { static final ImagePullPolicy NEVER_PULL = imageName -> false; - @Test - void test() throws InterruptedException + final Network network = Network.newNetwork(); + final GenericContainer haproxy, backend1, backend2, backend3; + + + AbstractHandoverITContainers() { - ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); - User user = new User("nerd"); - IntStream - .rangeClosed(1,100) - .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) - .map(result -> result - .map(MessageTo::toString) - .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) - .block()) - .forEach(result -> log.info("{}", result)); - - receiveMessages(chatRoom) - .take(100) - .doOnNext(message -> log.info("message: {}", message)) - .then() - .block(); + haproxy = createHaproxyContainer(); + backend1 = createBackendContainer("1"); + backend2 = createBackendContainer("2"); + backend3 = createBackendContainer("3"); } - abstract void setUpExtra() throws IOException, InterruptedException; + void setUpExtra() throws Exception + { + log.info("This setup does not need any extra containers"); + } - @BeforeEach void setUp() throws Exception { setUpExtra(); @@ -126,82 +103,40 @@ public abstract class AbstractHandoverIT .block()); } - Network network = Network.newNetwork(); - - GenericContainer haproxy = - new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) - .withNetwork(network) - .withNetworkAliases("haproxy") - .withClasspathResourceMapping( - "haproxy.cfg", - "/usr/local/etc/haproxy/haproxy.cfg", - BindMode.READ_ONLY) - .withClasspathResourceMapping( - "sharding.map", - "/usr/local/etc/haproxy/sharding.map", - BindMode.READ_WRITE) - .withExposedPorts(8400, 8401, 8404) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); - - abstract String[] getCommandBackend1(); - GenericContainer backend1 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-1") - .withCommand(getCommandBackend1()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); - - abstract String[] getCommandBackend2(); - GenericContainer backend2 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-2") - .withCommand(getCommandBackend2()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); - - abstract String[] getCommandBackend3(); - GenericContainer backend3 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-3") - .withCommand(getCommandBackend3()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); - - - @EqualsAndHashCode - @ToString - class User - { - @Getter - private final String name; - private int serial = 0; + abstract String[] getBackendCommand(); - - User (String name) - { - this.name = name; - } - - - int nextSerial() - { - return ++serial; - } + final GenericContainer createHaproxyContainer() + { + return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) + .withNetwork(network) + .withNetworkAliases("haproxy") + .withClasspathResourceMapping( + "haproxy.cfg", + "/usr/local/etc/haproxy/haproxy.cfg", + BindMode.READ_ONLY) + .withClasspathResourceMapping( + "sharding.map", + "/usr/local/etc/haproxy/sharding.map", + BindMode.READ_WRITE) + .withExposedPorts(8400, 8401, 8404) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); } - @Getter - @Setter - static class StatusTo + final GenericContainer createBackendContainer(String id) { - String status; + return new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-ID".replaceAll("ID", id)) + .withCommand(Arrays.stream(getBackendCommand()) + .map(commandPart -> commandPart.replaceAll("ID", id)) + .toArray(size -> new String[size])) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer( + log, + true + ) + .withPrefix("BACKEND-ID".replaceAll("ID", id))); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java index 7c511fd9..0a6c75d8 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java @@ -1,108 +1,13 @@ package de.juplo.kafka.chat.backend; import lombok.extern.slf4j.Slf4j; -import org.testcontainers.containers.*; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerImageName; - -import java.io.IOException; @Slf4j class KafkaHandoverIT extends AbstractHandoverIT { - @Override - void setUpExtra() throws IOException, InterruptedException - { - kafka.start(); - - Container.ExecResult result; - result = kafka.execInContainer( - "kafka-topics", - "--bootstrap-server", - "kafka:9999", - "--create", - "--topic", - "info_channel", - "--partitions", - "3"); - log.info( - "EXIT-CODE={}, STDOUT={}, STDERR={}", - result.getExitCode(), - result.getStdout(), - result.getStdout()); - result = kafka.execInContainer( - "kafka-topics", - "--bootstrap-server", - "kafka:9999", - "--create", - "--topic", - "data_channel", - "--partitions", - "10"); - log.info( - "EXIT-CODE={}, STDOUT={}, STDERR={}", - result.getExitCode(), - result.getStdout(), - result.getStdout()); - } - - - KafkaContainer kafka = - new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) - .withNetwork(network) - .withNetworkAliases("kafka") - .withListener(() -> "kafka:9999") - .withKraft() - .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); - - @Override - String[] getCommandBackend1() - { - return new String[] - { - "--chat.backend.instance-id=backend-1", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-1:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B1", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" - }; - } - - @Override - String[] getCommandBackend2() - { - return new String[] - { - "--chat.backend.instance-id=backend-2", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-2:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B2", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" - }; - } - - @Override - String[] getCommandBackend3() + KafkaHandoverIT() { - return new String[] - { - "--chat.backend.instance-id=backend-3", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-3:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B3", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" - }; + super(new KafkaHandoverITContainers()); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java index 7c511fd9..3c4c90fa 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java @@ -1,7 +1,8 @@ package de.juplo.kafka.chat.backend; import lombok.extern.slf4j.Slf4j; -import org.testcontainers.containers.*; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; @@ -10,7 +11,7 @@ import java.io.IOException; @Slf4j -class KafkaHandoverIT extends AbstractHandoverIT +class KafkaHandoverITContainers extends AbstractHandoverITContainers { @Override void setUpExtra() throws IOException, InterruptedException @@ -49,7 +50,7 @@ class KafkaHandoverIT extends AbstractHandoverIT } - KafkaContainer kafka = + private final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) .withNetwork(network) .withNetworkAliases("kafka") @@ -58,49 +59,18 @@ class KafkaHandoverIT extends AbstractHandoverIT .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); - @Override - String[] getCommandBackend1() - { - return new String[] - { - "--chat.backend.instance-id=backend-1", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-1:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B1", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" - }; - } - - @Override - String[] getCommandBackend2() - { - return new String[] - { - "--chat.backend.instance-id=backend-2", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-2:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B2", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" - }; - } @Override - String[] getCommandBackend3() + String[] getBackendCommand() { return new String[] { - "--chat.backend.instance-id=backend-3", + "--chat.backend.instance-id=backend-ID", "--chat.backend.services=kafka", "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-3:8080", + "--chat.backend.kafka.instance-uri=http://backend-ID:8080", "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=B3", + "--chat.backend.kafka.client-id-prefix=BID", "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" }; diff --git a/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java b/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java index f384f7e8..f6209429 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java +++ b/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java @@ -1,274 +1,13 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.codec.ServerSentEvent; -import org.springframework.web.reactive.function.client.WebClient; -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.images.ImagePullPolicy; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; -import java.io.IOException; -import java.time.Duration; -import java.util.stream.IntStream; -@Testcontainers -@Slf4j -public abstract class AbstractHandoverIT +@Getter +@Setter +class StatusTo { - static final ImagePullPolicy NEVER_PULL = imageName -> false; - static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; - - - @Test - void test() throws InterruptedException - { - ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); - User user = new User("nerd"); - IntStream - .rangeClosed(1,100) - .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) - .map(result -> result - .map(MessageTo::toString) - .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) - .block()) - .forEach(result -> log.info("{}", result)); - - receiveMessages(chatRoom) - .take(100) - .doOnNext(message -> log.info("message: {}", message)) - .then() - .block(); - } - - Mono createChatRoom(String name) - { - return webClient - .post() - .uri("/create") - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(name) - .accept(MediaType.APPLICATION_JSON) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(ChatRoomInfoTo.class); - } - else - { - return response.createError(); - } - }); - } - - Mono sendMessage( - ChatRoomInfoTo chatRoom, - User user, - String message) - { - return webClient - .put() - .uri( - "/{chatRoomId}/{username}/{serial}", - chatRoom.getId(), - user.getName(), - user.nextSerial()) - .contentType(MediaType.TEXT_PLAIN) - .accept(MediaType.APPLICATION_JSON) - .bodyValue(message) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(MessageTo.class); - } - else - { - return response.createError(); - } - }); - } - - Flux> receiveMessages(ChatRoomInfoTo chatRoom) - { - return webClient - .get() - .uri( - "/{chatRoomId}/listen", - chatRoom.getId()) - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(SSE_TYPE); - } - - - WebClient webClient; - - - abstract void setUpExtra() throws IOException, InterruptedException; - - @BeforeEach - void setUp() throws Exception - { - setUpExtra(); - haproxy.start(); - backend1.start(); - // backend2.start(); - // backend3.start(); - - Integer port = haproxy.getMappedPort(8400); - webClient = WebClient.create("http://localhost:" + port); - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> WebClient - .create("http://localhost:" + backend1.getMappedPort(8080)) - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - - haproxy - .getDockerClient() - .killContainerCmd(haproxy.getContainerId()) - .withSignal("HUP") - .exec(); - - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> webClient - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - } - - Network network = Network.newNetwork(); - - GenericContainer haproxy = - new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) - .withNetwork(network) - .withNetworkAliases("haproxy") - .withClasspathResourceMapping( - "haproxy.cfg", - "/usr/local/etc/haproxy/haproxy.cfg", - BindMode.READ_ONLY) - .withClasspathResourceMapping( - "sharding.map", - "/usr/local/etc/haproxy/sharding.map", - BindMode.READ_WRITE) - .withExposedPorts(8400, 8401, 8404) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); - - abstract String[] getCommandBackend1(); - GenericContainer backend1 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-1") - .withCommand(getCommandBackend1()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); - - abstract String[] getCommandBackend2(); - GenericContainer backend2 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-2") - .withCommand(getCommandBackend2()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); - - abstract String[] getCommandBackend3(); - GenericContainer backend3 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-3") - .withCommand(getCommandBackend3()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); - - - @EqualsAndHashCode - @ToString - class User - { - @Getter - private final String name; - private int serial = 0; - - - User (String name) - { - this.name = name; - } - - - int nextSerial() - { - return ++serial; - } - } - - @Getter - @Setter - static class StatusTo - { - String status; - } + String status; } diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java index f384f7e8..5dde9981 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java @@ -2,89 +2,34 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import de.juplo.kafka.chat.backend.api.MessageTo; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; -import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.images.ImagePullPolicy; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; -import java.io.IOException; import java.time.Duration; import java.util.stream.IntStream; -@Testcontainers @Slf4j -public abstract class AbstractHandoverIT +public class TestClient { - static final ImagePullPolicy NEVER_PULL = imageName -> false; - static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; - - - @Test - void test() throws InterruptedException + public void run() { - ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); - User user = new User("nerd"); IntStream .rangeClosed(1,100) - .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) + .mapToObj(i ->sendMessage(chatRoom, "Message #" + i)) .map(result -> result .map(MessageTo::toString) .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) .block()) .forEach(result -> log.info("{}", result)); - - receiveMessages(chatRoom) - .take(100) - .doOnNext(message -> log.info("message: {}", message)) - .then() - .block(); } - Mono createChatRoom(String name) - { - return webClient - .post() - .uri("/create") - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(name) - .accept(MediaType.APPLICATION_JSON) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(ChatRoomInfoTo.class); - } - else - { - return response.createError(); - } - }); - } - - Mono sendMessage( + private Mono sendMessage( ChatRoomInfoTo chatRoom, - User user, String message) { return webClient @@ -110,165 +55,16 @@ public abstract class AbstractHandoverIT }); } - Flux> receiveMessages(ChatRoomInfoTo chatRoom) - { - return webClient - .get() - .uri( - "/{chatRoomId}/listen", - chatRoom.getId()) - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(SSE_TYPE); - } - - - WebClient webClient; + private final WebClient webClient; + private final ChatRoomInfoTo chatRoom; + private final User user; - abstract void setUpExtra() throws IOException, InterruptedException; - @BeforeEach - void setUp() throws Exception + TestClient(Integer port, ChatRoomInfoTo chatRoom, String username) { - setUpExtra(); - haproxy.start(); - backend1.start(); - // backend2.start(); - // backend3.start(); - - Integer port = haproxy.getMappedPort(8400); webClient = WebClient.create("http://localhost:" + port); - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> WebClient - .create("http://localhost:" + backend1.getMappedPort(8080)) - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - - haproxy - .getDockerClient() - .killContainerCmd(haproxy.getContainerId()) - .withSignal("HUP") - .exec(); - - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> webClient - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - } - - Network network = Network.newNetwork(); - - GenericContainer haproxy = - new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) - .withNetwork(network) - .withNetworkAliases("haproxy") - .withClasspathResourceMapping( - "haproxy.cfg", - "/usr/local/etc/haproxy/haproxy.cfg", - BindMode.READ_ONLY) - .withClasspathResourceMapping( - "sharding.map", - "/usr/local/etc/haproxy/sharding.map", - BindMode.READ_WRITE) - .withExposedPorts(8400, 8401, 8404) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); - - abstract String[] getCommandBackend1(); - GenericContainer backend1 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-1") - .withCommand(getCommandBackend1()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); - - abstract String[] getCommandBackend2(); - GenericContainer backend2 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-2") - .withCommand(getCommandBackend2()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); - - abstract String[] getCommandBackend3(); - GenericContainer backend3 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-3") - .withCommand(getCommandBackend3()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); - - - @EqualsAndHashCode - @ToString - class User - { - @Getter - private final String name; - private int serial = 0; - - - User (String name) - { - this.name = name; - } - - - int nextSerial() - { - return ++serial; - } - } - - @Getter - @Setter - static class StatusTo - { - String status; + this.chatRoom = chatRoom; + user = new User(username); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/User.java b/src/test/java/de/juplo/kafka/chat/backend/User.java index f384f7e8..e708c9bf 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/User.java +++ b/src/test/java/de/juplo/kafka/chat/backend/User.java @@ -1,274 +1,27 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.Setter; import lombok.ToString; -import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.codec.ServerSentEvent; -import org.springframework.web.reactive.function.client.WebClient; -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.images.ImagePullPolicy; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; -import java.io.IOException; -import java.time.Duration; -import java.util.stream.IntStream; - -@Testcontainers -@Slf4j -public abstract class AbstractHandoverIT +@EqualsAndHashCode +@ToString +class User { - static final ImagePullPolicy NEVER_PULL = imageName -> false; - static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; - - - @Test - void test() throws InterruptedException - { - ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); - User user = new User("nerd"); - IntStream - .rangeClosed(1,100) - .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) - .map(result -> result - .map(MessageTo::toString) - .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) - .block()) - .forEach(result -> log.info("{}", result)); - - receiveMessages(chatRoom) - .take(100) - .doOnNext(message -> log.info("message: {}", message)) - .then() - .block(); - } + @Getter + private final String name; + private int serial = 0; - Mono createChatRoom(String name) - { - return webClient - .post() - .uri("/create") - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(name) - .accept(MediaType.APPLICATION_JSON) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(ChatRoomInfoTo.class); - } - else - { - return response.createError(); - } - }); - } - Mono sendMessage( - ChatRoomInfoTo chatRoom, - User user, - String message) + User (String name) { - return webClient - .put() - .uri( - "/{chatRoomId}/{username}/{serial}", - chatRoom.getId(), - user.getName(), - user.nextSerial()) - .contentType(MediaType.TEXT_PLAIN) - .accept(MediaType.APPLICATION_JSON) - .bodyValue(message) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(MessageTo.class); - } - else - { - return response.createError(); - } - }); + this.name = name; } - Flux> receiveMessages(ChatRoomInfoTo chatRoom) - { - return webClient - .get() - .uri( - "/{chatRoomId}/listen", - chatRoom.getId()) - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(SSE_TYPE); - } - - - WebClient webClient; - - - abstract void setUpExtra() throws IOException, InterruptedException; - - @BeforeEach - void setUp() throws Exception - { - setUpExtra(); - haproxy.start(); - backend1.start(); - // backend2.start(); - // backend3.start(); - - Integer port = haproxy.getMappedPort(8400); - webClient = WebClient.create("http://localhost:" + port); - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> WebClient - .create("http://localhost:" + backend1.getMappedPort(8080)) - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - haproxy - .getDockerClient() - .killContainerCmd(haproxy.getContainerId()) - .withSignal("HUP") - .exec(); - - - Awaitility - .await() - .atMost(Duration.ofMinutes(10)) - .until(() -> webClient - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - } - - Network network = Network.newNetwork(); - - GenericContainer haproxy = - new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) - .withNetwork(network) - .withNetworkAliases("haproxy") - .withClasspathResourceMapping( - "haproxy.cfg", - "/usr/local/etc/haproxy/haproxy.cfg", - BindMode.READ_ONLY) - .withClasspathResourceMapping( - "sharding.map", - "/usr/local/etc/haproxy/sharding.map", - BindMode.READ_WRITE) - .withExposedPorts(8400, 8401, 8404) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); - - abstract String[] getCommandBackend1(); - GenericContainer backend1 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-1") - .withCommand(getCommandBackend1()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); - - abstract String[] getCommandBackend2(); - GenericContainer backend2 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-2") - .withCommand(getCommandBackend2()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); - - abstract String[] getCommandBackend3(); - GenericContainer backend3 = - new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-3") - .withCommand(getCommandBackend3()) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); - - - @EqualsAndHashCode - @ToString - class User - { - @Getter - private final String name; - private int serial = 0; - - - User (String name) - { - this.name = name; - } - - - int nextSerial() - { - return ++serial; - } - } - - @Getter - @Setter - static class StatusTo + int nextSerial() { - String status; + return ++serial; } } -- 2.20.1