From bab72922a9c72b52053baa8cbd7d4c07455b7b95 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 27 Feb 2024 09:31:51 +0100 Subject: [PATCH] test: HandoverIT-POC - Splitted up code into smaller classes -- MOVE --- .../backend/AbstractHandoverITContainers.java | 207 +++++++++++++ .../backend/KafkaHandoverITContainers.java | 108 +++++++ .../de/juplo/kafka/chat/backend/StatusTo.java | 274 ++++++++++++++++++ .../juplo/kafka/chat/backend/TestClient.java | 274 ++++++++++++++++++ .../de/juplo/kafka/chat/backend/User.java | 274 ++++++++++++++++++ 5 files changed, 1137 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/StatusTo.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/TestClient.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/User.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java new file mode 100644 index 00000000..b22166fa --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java @@ -0,0 +1,207 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.ImagePullPolicy; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.io.IOException; +import java.time.Duration; +import java.util.stream.IntStream; + + +@Testcontainers +@Slf4j +public abstract class AbstractHandoverIT +{ + static final ImagePullPolicy NEVER_PULL = imageName -> false; + + + @Test + void test() throws InterruptedException + { + ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); + User user = new User("nerd"); + IntStream + .rangeClosed(1,100) + .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) + .map(result -> result + .map(MessageTo::toString) + .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) + .block()) + .forEach(result -> log.info("{}", result)); + + receiveMessages(chatRoom) + .take(100) + .doOnNext(message -> log.info("message: {}", message)) + .then() + .block(); + } + + + abstract void setUpExtra() throws IOException, InterruptedException; + + @BeforeEach + void setUp() throws Exception + { + setUpExtra(); + haproxy.start(); + backend1.start(); + // backend2.start(); + // backend3.start(); + + Awaitility + .await() + .atMost(Duration.ofMinutes(10)) + .until(() -> WebClient + .create("http://localhost:" + backend1.getMappedPort(8080)) + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + + haproxy + .getDockerClient() + .killContainerCmd(haproxy.getContainerId()) + .withSignal("HUP") + .exec(); + + + Awaitility + .await() + .atMost(Duration.ofMinutes(10)) + .until(() -> WebClient + .create("http://localhost:" + haproxy.getMappedPort(8400)) + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + } + + Network network = Network.newNetwork(); + + GenericContainer haproxy = + new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) + .withNetwork(network) + .withNetworkAliases("haproxy") + .withClasspathResourceMapping( + "haproxy.cfg", + "/usr/local/etc/haproxy/haproxy.cfg", + BindMode.READ_ONLY) + .withClasspathResourceMapping( + "sharding.map", + "/usr/local/etc/haproxy/sharding.map", + BindMode.READ_WRITE) + .withExposedPorts(8400, 8401, 8404) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); + + abstract String[] getCommandBackend1(); + GenericContainer backend1 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-1") + .withCommand(getCommandBackend1()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); + + abstract String[] getCommandBackend2(); + GenericContainer backend2 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-2") + .withCommand(getCommandBackend2()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); + + abstract String[] getCommandBackend3(); + GenericContainer backend3 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-3") + .withCommand(getCommandBackend3()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); + + + @EqualsAndHashCode + @ToString + class User + { + @Getter + private final String name; + private int serial = 0; + + + User (String name) + { + this.name = name; + } + + + int nextSerial() + { + return ++serial; + } + } + + @Getter + @Setter + static class StatusTo + { + String status; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java new file mode 100644 index 00000000..7c511fd9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java @@ -0,0 +1,108 @@ +package de.juplo.kafka.chat.backend; + +import lombok.extern.slf4j.Slf4j; +import org.testcontainers.containers.*; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + + +@Slf4j +class KafkaHandoverIT extends AbstractHandoverIT +{ + @Override + void setUpExtra() throws IOException, InterruptedException + { + kafka.start(); + + Container.ExecResult result; + result = kafka.execInContainer( + "kafka-topics", + "--bootstrap-server", + "kafka:9999", + "--create", + "--topic", + "info_channel", + "--partitions", + "3"); + log.info( + "EXIT-CODE={}, STDOUT={}, STDERR={}", + result.getExitCode(), + result.getStdout(), + result.getStdout()); + result = kafka.execInContainer( + "kafka-topics", + "--bootstrap-server", + "kafka:9999", + "--create", + "--topic", + "data_channel", + "--partitions", + "10"); + log.info( + "EXIT-CODE={}, STDOUT={}, STDERR={}", + result.getExitCode(), + result.getStdout(), + result.getStdout()); + } + + + KafkaContainer kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) + .withNetwork(network) + .withNetworkAliases("kafka") + .withListener(() -> "kafka:9999") + .withKraft() + .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); + + @Override + String[] getCommandBackend1() + { + return new String[] + { + "--chat.backend.instance-id=backend-1", + "--chat.backend.services=kafka", + "--chat.backend.kafka.bootstrap-servers=kafka:9999", + "--chat.backend.kafka.instance-uri=http://backend-1:8080", + "--chat.backend.kafka.num-partitions=10", + "--chat.backend.kafka.client-id-prefix=B1", + "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", + "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" + }; + } + + @Override + String[] getCommandBackend2() + { + return new String[] + { + "--chat.backend.instance-id=backend-2", + "--chat.backend.services=kafka", + "--chat.backend.kafka.bootstrap-servers=kafka:9999", + "--chat.backend.kafka.instance-uri=http://backend-2:8080", + "--chat.backend.kafka.num-partitions=10", + "--chat.backend.kafka.client-id-prefix=B2", + "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", + "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" + }; + } + + @Override + String[] getCommandBackend3() + { + return new String[] + { + "--chat.backend.instance-id=backend-3", + "--chat.backend.services=kafka", + "--chat.backend.kafka.bootstrap-servers=kafka:9999", + "--chat.backend.kafka.instance-uri=http://backend-3:8080", + "--chat.backend.kafka.num-partitions=10", + "--chat.backend.kafka.client-id-prefix=B3", + "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", + "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" + }; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java b/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java new file mode 100644 index 00000000..f384f7e8 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java @@ -0,0 +1,274 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.ImagePullPolicy; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.io.IOException; +import java.time.Duration; +import java.util.stream.IntStream; + + +@Testcontainers +@Slf4j +public abstract class AbstractHandoverIT +{ + static final ImagePullPolicy NEVER_PULL = imageName -> false; + static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; + + + @Test + void test() throws InterruptedException + { + ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); + User user = new User("nerd"); + IntStream + .rangeClosed(1,100) + .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) + .map(result -> result + .map(MessageTo::toString) + .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) + .block()) + .forEach(result -> log.info("{}", result)); + + receiveMessages(chatRoom) + .take(100) + .doOnNext(message -> log.info("message: {}", message)) + .then() + .block(); + } + + Mono createChatRoom(String name) + { + return webClient + .post() + .uri("/create") + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(name) + .accept(MediaType.APPLICATION_JSON) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(ChatRoomInfoTo.class); + } + else + { + return response.createError(); + } + }); + } + + Mono sendMessage( + ChatRoomInfoTo chatRoom, + User user, + String message) + { + return webClient + .put() + .uri( + "/{chatRoomId}/{username}/{serial}", + chatRoom.getId(), + user.getName(), + user.nextSerial()) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .bodyValue(message) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(MessageTo.class); + } + else + { + return response.createError(); + } + }); + } + + Flux> receiveMessages(ChatRoomInfoTo chatRoom) + { + return webClient + .get() + .uri( + "/{chatRoomId}/listen", + chatRoom.getId()) + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(SSE_TYPE); + } + + + WebClient webClient; + + + abstract void setUpExtra() throws IOException, InterruptedException; + + @BeforeEach + void setUp() throws Exception + { + setUpExtra(); + haproxy.start(); + backend1.start(); + // backend2.start(); + // backend3.start(); + + Integer port = haproxy.getMappedPort(8400); + webClient = WebClient.create("http://localhost:" + port); + + Awaitility + .await() + .atMost(Duration.ofMinutes(10)) + .until(() -> WebClient + .create("http://localhost:" + backend1.getMappedPort(8080)) + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + + haproxy + .getDockerClient() + .killContainerCmd(haproxy.getContainerId()) + .withSignal("HUP") + .exec(); + + + Awaitility + .await() + .atMost(Duration.ofMinutes(10)) + .until(() -> webClient + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + } + + Network network = Network.newNetwork(); + + GenericContainer haproxy = + new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) + .withNetwork(network) + .withNetworkAliases("haproxy") + .withClasspathResourceMapping( + "haproxy.cfg", + "/usr/local/etc/haproxy/haproxy.cfg", + BindMode.READ_ONLY) + .withClasspathResourceMapping( + "sharding.map", + "/usr/local/etc/haproxy/sharding.map", + BindMode.READ_WRITE) + .withExposedPorts(8400, 8401, 8404) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); + + abstract String[] getCommandBackend1(); + GenericContainer backend1 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-1") + .withCommand(getCommandBackend1()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); + + abstract String[] getCommandBackend2(); + GenericContainer backend2 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-2") + .withCommand(getCommandBackend2()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); + + abstract String[] getCommandBackend3(); + GenericContainer backend3 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-3") + .withCommand(getCommandBackend3()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); + + + @EqualsAndHashCode + @ToString + class User + { + @Getter + private final String name; + private int serial = 0; + + + User (String name) + { + this.name = name; + } + + + int nextSerial() + { + return ++serial; + } + } + + @Getter + @Setter + static class StatusTo + { + String status; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestClient.java b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java new file mode 100644 index 00000000..f384f7e8 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/TestClient.java @@ -0,0 +1,274 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.ImagePullPolicy; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.io.IOException; +import java.time.Duration; +import java.util.stream.IntStream; + + +@Testcontainers +@Slf4j +public abstract class AbstractHandoverIT +{ + static final ImagePullPolicy NEVER_PULL = imageName -> false; + static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; + + + @Test + void test() throws InterruptedException + { + ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); + User user = new User("nerd"); + IntStream + .rangeClosed(1,100) + .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) + .map(result -> result + .map(MessageTo::toString) + .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) + .block()) + .forEach(result -> log.info("{}", result)); + + receiveMessages(chatRoom) + .take(100) + .doOnNext(message -> log.info("message: {}", message)) + .then() + .block(); + } + + Mono createChatRoom(String name) + { + return webClient + .post() + .uri("/create") + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(name) + .accept(MediaType.APPLICATION_JSON) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(ChatRoomInfoTo.class); + } + else + { + return response.createError(); + } + }); + } + + Mono sendMessage( + ChatRoomInfoTo chatRoom, + User user, + String message) + { + return webClient + .put() + .uri( + "/{chatRoomId}/{username}/{serial}", + chatRoom.getId(), + user.getName(), + user.nextSerial()) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .bodyValue(message) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(MessageTo.class); + } + else + { + return response.createError(); + } + }); + } + + Flux> receiveMessages(ChatRoomInfoTo chatRoom) + { + return webClient + .get() + .uri( + "/{chatRoomId}/listen", + chatRoom.getId()) + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(SSE_TYPE); + } + + + WebClient webClient; + + + abstract void setUpExtra() throws IOException, InterruptedException; + + @BeforeEach + void setUp() throws Exception + { + setUpExtra(); + haproxy.start(); + backend1.start(); + // backend2.start(); + // backend3.start(); + + Integer port = haproxy.getMappedPort(8400); + webClient = WebClient.create("http://localhost:" + port); + + Awaitility + .await() + .atMost(Duration.ofMinutes(10)) + .until(() -> WebClient + .create("http://localhost:" + backend1.getMappedPort(8080)) + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + + haproxy + .getDockerClient() + .killContainerCmd(haproxy.getContainerId()) + .withSignal("HUP") + .exec(); + + + Awaitility + .await() + .atMost(Duration.ofMinutes(10)) + .until(() -> webClient + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + } + + Network network = Network.newNetwork(); + + GenericContainer haproxy = + new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) + .withNetwork(network) + .withNetworkAliases("haproxy") + .withClasspathResourceMapping( + "haproxy.cfg", + "/usr/local/etc/haproxy/haproxy.cfg", + BindMode.READ_ONLY) + .withClasspathResourceMapping( + "sharding.map", + "/usr/local/etc/haproxy/sharding.map", + BindMode.READ_WRITE) + .withExposedPorts(8400, 8401, 8404) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); + + abstract String[] getCommandBackend1(); + GenericContainer backend1 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-1") + .withCommand(getCommandBackend1()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); + + abstract String[] getCommandBackend2(); + GenericContainer backend2 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-2") + .withCommand(getCommandBackend2()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); + + abstract String[] getCommandBackend3(); + GenericContainer backend3 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-3") + .withCommand(getCommandBackend3()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); + + + @EqualsAndHashCode + @ToString + class User + { + @Getter + private final String name; + private int serial = 0; + + + User (String name) + { + this.name = name; + } + + + int nextSerial() + { + return ++serial; + } + } + + @Getter + @Setter + static class StatusTo + { + String status; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/User.java b/src/test/java/de/juplo/kafka/chat/backend/User.java new file mode 100644 index 00000000..f384f7e8 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/User.java @@ -0,0 +1,274 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.ImagePullPolicy; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.io.IOException; +import java.time.Duration; +import java.util.stream.IntStream; + + +@Testcontainers +@Slf4j +public abstract class AbstractHandoverIT +{ + static final ImagePullPolicy NEVER_PULL = imageName -> false; + static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; + + + @Test + void test() throws InterruptedException + { + ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); + User user = new User("nerd"); + IntStream + .rangeClosed(1,100) + .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i)) + .map(result -> result + .map(MessageTo::toString) + .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))) + .block()) + .forEach(result -> log.info("{}", result)); + + receiveMessages(chatRoom) + .take(100) + .doOnNext(message -> log.info("message: {}", message)) + .then() + .block(); + } + + Mono createChatRoom(String name) + { + return webClient + .post() + .uri("/create") + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(name) + .accept(MediaType.APPLICATION_JSON) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(ChatRoomInfoTo.class); + } + else + { + return response.createError(); + } + }); + } + + Mono sendMessage( + ChatRoomInfoTo chatRoom, + User user, + String message) + { + return webClient + .put() + .uri( + "/{chatRoomId}/{username}/{serial}", + chatRoom.getId(), + user.getName(), + user.nextSerial()) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .bodyValue(message) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(MessageTo.class); + } + else + { + return response.createError(); + } + }); + } + + Flux> receiveMessages(ChatRoomInfoTo chatRoom) + { + return webClient + .get() + .uri( + "/{chatRoomId}/listen", + chatRoom.getId()) + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(SSE_TYPE); + } + + + WebClient webClient; + + + abstract void setUpExtra() throws IOException, InterruptedException; + + @BeforeEach + void setUp() throws Exception + { + setUpExtra(); + haproxy.start(); + backend1.start(); + // backend2.start(); + // backend3.start(); + + Integer port = haproxy.getMappedPort(8400); + webClient = WebClient.create("http://localhost:" + port); + + Awaitility + .await() + .atMost(Duration.ofMinutes(10)) + .until(() -> WebClient + .create("http://localhost:" + backend1.getMappedPort(8080)) + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + + haproxy + .getDockerClient() + .killContainerCmd(haproxy.getContainerId()) + .withSignal("HUP") + .exec(); + + + Awaitility + .await() + .atMost(Duration.ofMinutes(10)) + .until(() -> webClient + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + } + + Network network = Network.newNetwork(); + + GenericContainer haproxy = + new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) + .withNetwork(network) + .withNetworkAliases("haproxy") + .withClasspathResourceMapping( + "haproxy.cfg", + "/usr/local/etc/haproxy/haproxy.cfg", + BindMode.READ_ONLY) + .withClasspathResourceMapping( + "sharding.map", + "/usr/local/etc/haproxy/sharding.map", + BindMode.READ_WRITE) + .withExposedPorts(8400, 8401, 8404) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); + + abstract String[] getCommandBackend1(); + GenericContainer backend1 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-1") + .withCommand(getCommandBackend1()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); + + abstract String[] getCommandBackend2(); + GenericContainer backend2 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-2") + .withCommand(getCommandBackend2()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); + + abstract String[] getCommandBackend3(); + GenericContainer backend3 = + new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-3") + .withCommand(getCommandBackend3()) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); + + + @EqualsAndHashCode + @ToString + class User + { + @Getter + private final String name; + private int serial = 0; + + + User (String name) + { + this.name = name; + } + + + int nextSerial() + { + return ++serial; + } + } + + @Getter + @Setter + static class StatusTo + { + String status; + } +} -- 2.20.1