From: Kai Moritz Date: Mon, 25 Mar 2024 21:07:34 +0000 (+0100) Subject: refactor: Moved the ``HandoverIT``-tests into the ``api``-package -- MOVE X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=704b1755dc9494730bd163308e4ada8f2fe3fae8;p=demos%2Fkafka%2Fchat refactor: Moved the ``HandoverIT``-tests into the ``api``-package -- MOVE --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java deleted file mode 100644 index e92155d6..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java +++ /dev/null @@ -1,143 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.web.reactive.function.client.WebClient; -import pl.rzrz.assertj.reactor.Assertions; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.Duration; -import java.util.List; -import java.util.concurrent.CompletableFuture; - - -@Slf4j -public abstract class AbstractHandoverIT -{ - static final int NUM_CHATROOMS = 23; - static final int NUM_CLIENTS = 17; - - - private final AbstractHandoverITContainers containers; - - - AbstractHandoverIT(AbstractHandoverITContainers containers) - { - this.containers = containers; - } - - - @Test - void test() throws InterruptedException - { - log.info("Starting backend-1..."); - containers.startBackend(containers.backend1, new TestWriter[0]); - log.info("backend-1 started!"); - - ChatRoomInfoTo[] chatRooms = Flux - .range(0, NUM_CHATROOMS) - .flatMap(i -> createChatRoom("room-" + i)) - .toStream() - .toArray(size -> new ChatRoomInfoTo[size]); - - int port = containers.haproxy.getMappedPort(8400); - - CompletableFuture[] testWriterFutures = new CompletableFuture[NUM_CLIENTS]; - TestWriter[] testWriters = new TestWriter[NUM_CLIENTS]; - for (int i = 0; i < NUM_CLIENTS; i++) - { - TestWriter testWriter = new TestWriter( - port, - chatRooms[i % NUM_CHATROOMS], - "user-" + i); - testWriters[i] = testWriter; - testWriterFutures[i] = testWriter - .run() - .toFuture(); - } - - TestListener testListener = new TestListener(port, chatRooms); - testListener - .run() - .subscribe(message -> log.info( - "Received message: {}", - message)); - - log.info("Starting backend-2..."); - containers.startBackend(containers.backend2, testWriters); - log.info("backend-2 started!"); - - log.info("Starting backend-3..."); - containers.startBackend(containers.backend3, testWriters); - log.info("backend-3 started!"); - - for (int i = 0; i < NUM_CLIENTS; i++) - { - testWriters[i].running = false; - testWriterFutures[i].join(); - log.info("Joined TestWriter {}", testWriters[i].user); - } - - Awaitility - .await() - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> assertAllSentMessagesReceived(testWriters, testListener)); - } - - private void assertAllSentMessagesReceived( - TestWriter[] testWriters, - TestListener testListener) - { - for (int i = 0; i < NUM_CLIENTS; i++) - { - TestWriter testWriter = testWriters[i]; - ChatRoomInfoTo chatRoom = testWriter.chatRoom; - List receivedMessages = testListener.receivedMessages.get(chatRoom.getId()); - - Assertions.assertThat(receivedMessages - .stream() - .filter(message -> message.getUser().equals(testWriter.user.getName())) - ).containsExactlyElementsOf(testWriter.sentMessages); - } - } - - Mono createChatRoom(String name) - { - return webClient - .post() - .uri("/create") - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(name) - .accept(MediaType.APPLICATION_JSON) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(ChatRoomInfoTo.class); - } - else - { - return response.createError(); - } - }); - } - - - WebClient webClient; - - @BeforeEach - void setUp() throws Exception - { - containers.setUp(); - - Integer port = containers.haproxy.getMappedPort(8400); - webClient = WebClient.create("http://localhost:" + port); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java deleted file mode 100644 index 0b245a9e..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java +++ /dev/null @@ -1,256 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo; -import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo; -import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; -import org.springframework.http.HttpStatus; -import org.springframework.http.HttpStatusCode; -import org.springframework.http.MediaType; -import org.springframework.web.reactive.function.client.WebClient; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.images.ImagePullPolicy; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.MountableFile; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -import java.time.Duration; -import java.time.Instant; -import java.util.Arrays; - -import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM; - - -@Slf4j -public abstract class AbstractHandoverITContainers -{ - static final ImagePullPolicy NEVER_PULL = imageName -> false; - - - final Network network = Network.newNetwork(); - final GenericContainer haproxy, backend1, backend2, backend3; - - - AbstractHandoverITContainers() - { - haproxy = createHaproxyContainer(); - haproxy.start(); - - backend1 = createBackendContainer("1"); - backend2 = createBackendContainer("2"); - backend3 = createBackendContainer("3"); - } - - - void setUpExtra() throws Exception - { - log.info("This setup does not need any extra containers"); - } - - void setUp() throws Exception - { - setUpExtra(); - } - - void startBackend( - GenericContainer backend, - TestWriter[] testWriters) - { - backend.start(); - - int[] numSentMessages = Arrays - .stream(testWriters) - .mapToInt(testWriter -> testWriter.getNumSentMessages()) - .toArray(); - - String backendUri = "http://localhost:" + backend.getMappedPort(8080); - - Instant before, after; - - before = Instant.now(); - HttpStatusCode statusCode = WebClient - .create(backendUri) - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - log.info("{} responded with {}", backendUri, response.statusCode()); - return Mono.just(response.statusCode()); - }) - .flatMap(status -> switch (status.value()) - { - case 200, 503 -> Mono.just(status); - default -> Mono.error(new RuntimeException(status.toString())); - }) - .retryWhen(Retry.backoff(30, Duration.ofSeconds(1))) - .block(); - after = Instant.now(); - log.info("Took {} to reach status {}", Duration.between(before, after), statusCode); - - before = Instant.now(); - Awaitility - .await() - .atMost(Duration.ofSeconds(45)) - .until(() -> WebClient - .create(backendUri) - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - log.info("{} responded with {}", backendUri, response.statusCode()); - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - after = Instant.now(); - log.info("Took {} until the backend reported status UP", Duration.between(before, after)); - - haproxy - .getDockerClient() - .killContainerCmd(haproxy.getContainerId()) - .withSignal("HUP") - .exec(); - - before = Instant.now(); - Awaitility - .await() - .atMost(Duration.ofSeconds(15)) - .until(() -> WebClient - .create("http://localhost:" + haproxy.getMappedPort(8400)) - .get() - .uri("/actuator/health") - .exchangeToMono(response -> - { - log.info("{} responded with {}", backendUri, response.statusCode()); - if (response.statusCode().equals(HttpStatus.OK)) - { - return response - .bodyToMono(StatusTo.class) - .map(StatusTo::getStatus) - .map(status -> status.equalsIgnoreCase("UP")); - } - else - { - return Mono.just(false); - } - }) - .block()); - after = Instant.now(); - log.info("Took {} until haproxy reported status UP", Duration.between(before, after)); - - before = Instant.now(); - Awaitility - .await() - .atMost(Duration.ofSeconds(30)) - .until(() -> - { - for (int i = 0; i < testWriters.length; i++) - { - TestWriter testWriter = testWriters[i]; - int sentTotal = testWriter.getNumSentMessages(); - if (sentTotal == numSentMessages[i]) - { - log.info( - "No progress for {}: sent-before={}, sent-total={}, map: {}", - testWriter, - numSentMessages[i], - sentTotal, - readHaproxyMap()); - return false; - } - } - - return true; - }); - after = Instant.now(); - log.info("Took {} until all writers made some progress", Duration.between(before, after)); - } - - abstract String[] getBackendCommand(); - - final GenericContainer createHaproxyContainer() - { - return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) - .withCommand("-f", "/etc/haproxy") - .withNetwork(network) - .withNetworkAliases("haproxy") - .withCopyFileToContainer( - MountableFile.forClasspathResource("haproxy.cfg"), - "/etc/haproxy/haproxy.cfg") - .withCopyFileToContainer( - MountableFile.forClasspathResource("dataplaneapi.yml"), - "/etc/haproxy/dataplaneapi.yml") - .withCopyFileToContainer( - MountableFile.forClasspathResource("sharding.map"), - "/etc/haproxy/maps/sharding.map") - .withExposedPorts(8400, 8401, 8404, 5555) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); - } - - final GenericContainer createBackendContainer(String id) - { - return new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) - .withImagePullPolicy(NEVER_PULL) - .withNetwork(network) - .withNetworkAliases("backend-ID".replaceAll("ID", id)) - .withCommand(Arrays.stream(getBackendCommand()) - .map(commandPart -> commandPart.replaceAll("ID", id)) - .toArray(size -> new String[size])) - .withExposedPorts(8080) - .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer( - log, - true - ) - .withPrefix("BACKEND-ID".replaceAll("ID", id))); - } - - private String readHaproxyMap() - { - return createHaproxyWebClient() - .get() - .uri(uriBuilder -> uriBuilder - .path("/services/haproxy/runtime/maps_entries") - .queryParam(MAP_PARAM, MAP_NAME) - .build()) - .accept(MediaType.APPLICATION_JSON) - .exchangeToFlux(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToFlux(MapEntryTo.class); - } - else - { - return response.createError().flux(); - } - }) - .map(entry -> entry.key() + "=" + entry.value()) - .reduce((a, b) -> a + ", " + b) - .block(); - } - - private WebClient createHaproxyWebClient() - { - return WebClient - .builder() - .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/") - .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo")) - .build(); - } - - static final String MAP_NAME = "sharding"; -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java deleted file mode 100644 index 0a6c75d8..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import lombok.extern.slf4j.Slf4j; - - -@Slf4j -class KafkaHandoverIT extends AbstractHandoverIT -{ - KafkaHandoverIT() - { - super(new KafkaHandoverITContainers()); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java deleted file mode 100644 index 9a88012b..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java +++ /dev/null @@ -1,81 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import lombok.extern.slf4j.Slf4j; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerImageName; - -import java.io.IOException; - - -@Slf4j -class KafkaHandoverITContainers extends AbstractHandoverITContainers -{ - @Override - void setUpExtra() throws IOException, InterruptedException - { - kafka.start(); - - Container.ExecResult result; - result = kafka.execInContainer( - "kafka-topics", - "--bootstrap-server", - "kafka:9999", - "--create", - "--topic", - "info_channel", - "--partitions", - "3"); - log.info( - "EXIT-CODE={}, STDOUT={}, STDERR={}", - result.getExitCode(), - result.getStdout(), - result.getStdout()); - result = kafka.execInContainer( - "kafka-topics", - "--bootstrap-server", - "kafka:9999", - "--create", - "--topic", - "data_channel", - "--partitions", - "10"); - log.info( - "EXIT-CODE={}, STDOUT={}, STDERR={}", - result.getExitCode(), - result.getStdout(), - result.getStdout()); - } - - - private final KafkaContainer kafka = - new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) - .withNetwork(network) - .withNetworkAliases("kafka") - .withListener(() -> "kafka:9999") - .withKraft() - .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1)) - .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); - - - @Override - String[] getBackendCommand() - { - return new String[] - { - "--chat.backend.instance-id=backend_ID", - "--chat.backend.services=kafka", - "--chat.backend.kafka.bootstrap-servers=kafka:9999", - "--chat.backend.kafka.instance-uri=http://backend-ID:8080", - "--chat.backend.kafka.num-partitions=10", - "--chat.backend.kafka.client-id-prefix=BID", - "--chat.backend.kafka.haproxy-data-plane-api=http://haproxy:5555/v2/", - "--chat.backend.kafka.haproxy-user=juplo", - "--chat.backend.kafka.haproxy-password=juplo", - "--chat.backend.kafka.haproxy-map=sharding", - "--logging.level.de.juplo=DEBUG" - }; - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java b/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java deleted file mode 100644 index f6209429..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import lombok.Getter; -import lombok.Setter; - - - -@Getter -@Setter -class StatusTo -{ - String status; -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java deleted file mode 100644 index 092cd43d..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java +++ /dev/null @@ -1,91 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import lombok.extern.slf4j.Slf4j; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.MediaType; -import org.springframework.http.codec.ServerSentEvent; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -import java.time.Duration; -import java.util.*; - - -@Slf4j -public class TestListener -{ - static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; - - - public Flux run() - { - return Flux - .fromArray(chatRooms) - .flatMap(chatRoom -> - { - List list = new LinkedList<>(); - receivedMessages.put(chatRoom.getId(), list); - return receiveMessages(chatRoom); - }); - } - - Flux receiveMessages(ChatRoomInfoTo chatRoom) - { - log.info("Requesting messages for chat-room {}", chatRoom); - List list = receivedMessages.get(chatRoom.getId()); - return receiveServerSentEvents(chatRoom) - .flatMap(sse -> - { - try - { - return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class)); - } - catch (Exception e) - { - return Mono.error(e); - } - }) - .doOnNext(message -> list.add(message)) - .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom)) - .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe)) - .thenMany(Flux.defer(() -> receiveMessages(chatRoom))); - } - - Flux> receiveServerSentEvents(ChatRoomInfoTo chatRoom) - { - return webClient - .get() - .uri( - "/{chatRoomId}/listen", - chatRoom.getId()) - .accept(MediaType.TEXT_EVENT_STREAM) - .header("X-Shard", chatRoom.getShard().toString()) - .retrieve() - .bodyToFlux(SSE_TYPE) - .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1))); - } - - - private final WebClient webClient; - private final ChatRoomInfoTo[] chatRooms; - private final ObjectMapper objectMapper; - - final Map> receivedMessages = new HashMap<>(); - - - TestListener(Integer port, ChatRoomInfoTo[] chatRooms) - { - webClient = WebClient.create("http://localhost:" + port); - this.chatRooms = chatRooms; - objectMapper = new ObjectMapper(); - objectMapper.registerModule(new JavaTimeModule()); - objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java b/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java deleted file mode 100644 index 4220d002..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java +++ /dev/null @@ -1,129 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import lombok.Getter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClientResponseException; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -import java.nio.charset.Charset; -import java.time.Duration; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - - -@ToString(of = "user") -@Slf4j -public class TestWriter -{ - public Mono run() - { - return Flux - .fromIterable((Iterable) () -> new Iterator<>() - { - private int i = 0; - - @Override - public boolean hasNext() - { - return running; - } - - @Override - public Integer next() - { - return i++; - } - }) - .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500))) - .map(i -> "Message #" + i) - .concatMap(message -> sendMessage(chatRoom, message) - .doOnError(throwable -> - { - WebClientResponseException e = (WebClientResponseException)throwable; - log.info( - "Could not sent message {} for {}: {}", - message, - user.getName(), - e.getResponseBodyAsString()); - }) - .retryWhen(Retry.fixedDelay(60, Duration.ofSeconds(1)))) - .doOnNext(message -> - { - numSentMessages++; - sentMessages.add(message); - log.info( - "{} sent a message to {}: {}", - user, - chatRoom, - message); - }) - .doOnError(throwable -> - { - WebClientResponseException e = (WebClientResponseException)throwable.getCause(); - log.error( - "{} failed sending a message: {}", - user, - e.getResponseBodyAsString(Charset.defaultCharset())); - }) - .takeUntil(message -> !running) - .doOnComplete(() -> log.info("TestWriter {} is done", user)) - .then(); - } - - private Mono sendMessage( - ChatRoomInfoTo chatRoom, - String message) - { - return webClient - .put() - .uri( - "/{chatRoomId}/{username}/{serial}", - chatRoom.getId(), - user.getName(), - user.nextSerial()) - .contentType(MediaType.TEXT_PLAIN) - .accept(MediaType.APPLICATION_JSON) - .header("X-Shard", chatRoom.getShard().toString()) - .bodyValue(message) - .exchangeToMono(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToMono(MessageTo.class); - } - else - { - return response.createError(); - } - }); - } - - - private final WebClient webClient; - - final ChatRoomInfoTo chatRoom; - final User user; - final List sentMessages = new LinkedList<>(); - - volatile boolean running = true; - @Getter - private volatile int numSentMessages = 0; - - - TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username) - { - webClient = WebClient.create("http://localhost:" + port); - this.chatRoom = chatRoom; - user = new User(username); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/User.java b/src/test/java/de/juplo/kafka/chat/backend/User.java deleted file mode 100644 index bcd97b46..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/User.java +++ /dev/null @@ -1,27 +0,0 @@ -package de.juplo.kafka.chat.backend; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.ToString; - - -@EqualsAndHashCode(of = "name") -@ToString(of = "name") -class User -{ - @Getter - private final String name; - private int serial = 0; - - - User (String name) - { - this.name = name; - } - - - int nextSerial() - { - return ++serial; - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverIT.java new file mode 100644 index 00000000..e92155d6 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverIT.java @@ -0,0 +1,143 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.WebClient; +import pl.rzrz.assertj.reactor.Assertions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; + + +@Slf4j +public abstract class AbstractHandoverIT +{ + static final int NUM_CHATROOMS = 23; + static final int NUM_CLIENTS = 17; + + + private final AbstractHandoverITContainers containers; + + + AbstractHandoverIT(AbstractHandoverITContainers containers) + { + this.containers = containers; + } + + + @Test + void test() throws InterruptedException + { + log.info("Starting backend-1..."); + containers.startBackend(containers.backend1, new TestWriter[0]); + log.info("backend-1 started!"); + + ChatRoomInfoTo[] chatRooms = Flux + .range(0, NUM_CHATROOMS) + .flatMap(i -> createChatRoom("room-" + i)) + .toStream() + .toArray(size -> new ChatRoomInfoTo[size]); + + int port = containers.haproxy.getMappedPort(8400); + + CompletableFuture[] testWriterFutures = new CompletableFuture[NUM_CLIENTS]; + TestWriter[] testWriters = new TestWriter[NUM_CLIENTS]; + for (int i = 0; i < NUM_CLIENTS; i++) + { + TestWriter testWriter = new TestWriter( + port, + chatRooms[i % NUM_CHATROOMS], + "user-" + i); + testWriters[i] = testWriter; + testWriterFutures[i] = testWriter + .run() + .toFuture(); + } + + TestListener testListener = new TestListener(port, chatRooms); + testListener + .run() + .subscribe(message -> log.info( + "Received message: {}", + message)); + + log.info("Starting backend-2..."); + containers.startBackend(containers.backend2, testWriters); + log.info("backend-2 started!"); + + log.info("Starting backend-3..."); + containers.startBackend(containers.backend3, testWriters); + log.info("backend-3 started!"); + + for (int i = 0; i < NUM_CLIENTS; i++) + { + testWriters[i].running = false; + testWriterFutures[i].join(); + log.info("Joined TestWriter {}", testWriters[i].user); + } + + Awaitility + .await() + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertAllSentMessagesReceived(testWriters, testListener)); + } + + private void assertAllSentMessagesReceived( + TestWriter[] testWriters, + TestListener testListener) + { + for (int i = 0; i < NUM_CLIENTS; i++) + { + TestWriter testWriter = testWriters[i]; + ChatRoomInfoTo chatRoom = testWriter.chatRoom; + List receivedMessages = testListener.receivedMessages.get(chatRoom.getId()); + + Assertions.assertThat(receivedMessages + .stream() + .filter(message -> message.getUser().equals(testWriter.user.getName())) + ).containsExactlyElementsOf(testWriter.sentMessages); + } + } + + Mono createChatRoom(String name) + { + return webClient + .post() + .uri("/create") + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(name) + .accept(MediaType.APPLICATION_JSON) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(ChatRoomInfoTo.class); + } + else + { + return response.createError(); + } + }); + } + + + WebClient webClient; + + @BeforeEach + void setUp() throws Exception + { + containers.setUp(); + + Integer port = containers.haproxy.getMappedPort(8400); + webClient = WebClient.create("http://localhost:" + port); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverITContainers.java new file mode 100644 index 00000000..0b245a9e --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverITContainers.java @@ -0,0 +1,256 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo; +import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.springframework.http.HttpStatus; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.ImagePullPolicy; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; + +import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM; + + +@Slf4j +public abstract class AbstractHandoverITContainers +{ + static final ImagePullPolicy NEVER_PULL = imageName -> false; + + + final Network network = Network.newNetwork(); + final GenericContainer haproxy, backend1, backend2, backend3; + + + AbstractHandoverITContainers() + { + haproxy = createHaproxyContainer(); + haproxy.start(); + + backend1 = createBackendContainer("1"); + backend2 = createBackendContainer("2"); + backend3 = createBackendContainer("3"); + } + + + void setUpExtra() throws Exception + { + log.info("This setup does not need any extra containers"); + } + + void setUp() throws Exception + { + setUpExtra(); + } + + void startBackend( + GenericContainer backend, + TestWriter[] testWriters) + { + backend.start(); + + int[] numSentMessages = Arrays + .stream(testWriters) + .mapToInt(testWriter -> testWriter.getNumSentMessages()) + .toArray(); + + String backendUri = "http://localhost:" + backend.getMappedPort(8080); + + Instant before, after; + + before = Instant.now(); + HttpStatusCode statusCode = WebClient + .create(backendUri) + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + log.info("{} responded with {}", backendUri, response.statusCode()); + return Mono.just(response.statusCode()); + }) + .flatMap(status -> switch (status.value()) + { + case 200, 503 -> Mono.just(status); + default -> Mono.error(new RuntimeException(status.toString())); + }) + .retryWhen(Retry.backoff(30, Duration.ofSeconds(1))) + .block(); + after = Instant.now(); + log.info("Took {} to reach status {}", Duration.between(before, after), statusCode); + + before = Instant.now(); + Awaitility + .await() + .atMost(Duration.ofSeconds(45)) + .until(() -> WebClient + .create(backendUri) + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + log.info("{} responded with {}", backendUri, response.statusCode()); + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + after = Instant.now(); + log.info("Took {} until the backend reported status UP", Duration.between(before, after)); + + haproxy + .getDockerClient() + .killContainerCmd(haproxy.getContainerId()) + .withSignal("HUP") + .exec(); + + before = Instant.now(); + Awaitility + .await() + .atMost(Duration.ofSeconds(15)) + .until(() -> WebClient + .create("http://localhost:" + haproxy.getMappedPort(8400)) + .get() + .uri("/actuator/health") + .exchangeToMono(response -> + { + log.info("{} responded with {}", backendUri, response.statusCode()); + if (response.statusCode().equals(HttpStatus.OK)) + { + return response + .bodyToMono(StatusTo.class) + .map(StatusTo::getStatus) + .map(status -> status.equalsIgnoreCase("UP")); + } + else + { + return Mono.just(false); + } + }) + .block()); + after = Instant.now(); + log.info("Took {} until haproxy reported status UP", Duration.between(before, after)); + + before = Instant.now(); + Awaitility + .await() + .atMost(Duration.ofSeconds(30)) + .until(() -> + { + for (int i = 0; i < testWriters.length; i++) + { + TestWriter testWriter = testWriters[i]; + int sentTotal = testWriter.getNumSentMessages(); + if (sentTotal == numSentMessages[i]) + { + log.info( + "No progress for {}: sent-before={}, sent-total={}, map: {}", + testWriter, + numSentMessages[i], + sentTotal, + readHaproxyMap()); + return false; + } + } + + return true; + }); + after = Instant.now(); + log.info("Took {} until all writers made some progress", Duration.between(before, after)); + } + + abstract String[] getBackendCommand(); + + final GenericContainer createHaproxyContainer() + { + return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) + .withCommand("-f", "/etc/haproxy") + .withNetwork(network) + .withNetworkAliases("haproxy") + .withCopyFileToContainer( + MountableFile.forClasspathResource("haproxy.cfg"), + "/etc/haproxy/haproxy.cfg") + .withCopyFileToContainer( + MountableFile.forClasspathResource("dataplaneapi.yml"), + "/etc/haproxy/dataplaneapi.yml") + .withCopyFileToContainer( + MountableFile.forClasspathResource("sharding.map"), + "/etc/haproxy/maps/sharding.map") + .withExposedPorts(8400, 8401, 8404, 5555) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); + } + + final GenericContainer createBackendContainer(String id) + { + return new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) + .withImagePullPolicy(NEVER_PULL) + .withNetwork(network) + .withNetworkAliases("backend-ID".replaceAll("ID", id)) + .withCommand(Arrays.stream(getBackendCommand()) + .map(commandPart -> commandPart.replaceAll("ID", id)) + .toArray(size -> new String[size])) + .withExposedPorts(8080) + .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer( + log, + true + ) + .withPrefix("BACKEND-ID".replaceAll("ID", id))); + } + + private String readHaproxyMap() + { + return createHaproxyWebClient() + .get() + .uri(uriBuilder -> uriBuilder + .path("/services/haproxy/runtime/maps_entries") + .queryParam(MAP_PARAM, MAP_NAME) + .build()) + .accept(MediaType.APPLICATION_JSON) + .exchangeToFlux(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToFlux(MapEntryTo.class); + } + else + { + return response.createError().flux(); + } + }) + .map(entry -> entry.key() + "=" + entry.value()) + .reduce((a, b) -> a + ", " + b) + .block(); + } + + private WebClient createHaproxyWebClient() + { + return WebClient + .builder() + .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/") + .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo")) + .build(); + } + + static final String MAP_NAME = "sharding"; +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverIT.java new file mode 100644 index 00000000..0a6c75d8 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverIT.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.chat.backend; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +class KafkaHandoverIT extends AbstractHandoverIT +{ + KafkaHandoverIT() + { + super(new KafkaHandoverITContainers()); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverITContainers.java new file mode 100644 index 00000000..9a88012b --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverITContainers.java @@ -0,0 +1,81 @@ +package de.juplo.kafka.chat.backend; + +import lombok.extern.slf4j.Slf4j; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + + +@Slf4j +class KafkaHandoverITContainers extends AbstractHandoverITContainers +{ + @Override + void setUpExtra() throws IOException, InterruptedException + { + kafka.start(); + + Container.ExecResult result; + result = kafka.execInContainer( + "kafka-topics", + "--bootstrap-server", + "kafka:9999", + "--create", + "--topic", + "info_channel", + "--partitions", + "3"); + log.info( + "EXIT-CODE={}, STDOUT={}, STDERR={}", + result.getExitCode(), + result.getStdout(), + result.getStdout()); + result = kafka.execInContainer( + "kafka-topics", + "--bootstrap-server", + "kafka:9999", + "--create", + "--topic", + "data_channel", + "--partitions", + "10"); + log.info( + "EXIT-CODE={}, STDOUT={}, STDERR={}", + result.getExitCode(), + result.getStdout(), + result.getStdout()); + } + + + private final KafkaContainer kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) + .withNetwork(network) + .withNetworkAliases("kafka") + .withListener(() -> "kafka:9999") + .withKraft() + .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1)) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); + + + @Override + String[] getBackendCommand() + { + return new String[] + { + "--chat.backend.instance-id=backend_ID", + "--chat.backend.services=kafka", + "--chat.backend.kafka.bootstrap-servers=kafka:9999", + "--chat.backend.kafka.instance-uri=http://backend-ID:8080", + "--chat.backend.kafka.num-partitions=10", + "--chat.backend.kafka.client-id-prefix=BID", + "--chat.backend.kafka.haproxy-data-plane-api=http://haproxy:5555/v2/", + "--chat.backend.kafka.haproxy-user=juplo", + "--chat.backend.kafka.haproxy-password=juplo", + "--chat.backend.kafka.haproxy-map=sharding", + "--logging.level.de.juplo=DEBUG" + }; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/StatusTo.java b/src/test/java/de/juplo/kafka/chat/backend/api/StatusTo.java new file mode 100644 index 00000000..f6209429 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/api/StatusTo.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.chat.backend; + +import lombok.Getter; +import lombok.Setter; + + + +@Getter +@Setter +class StatusTo +{ + String status; +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/api/TestListener.java new file mode 100644 index 00000000..092cd43d --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/api/TestListener.java @@ -0,0 +1,91 @@ +package de.juplo.kafka.chat.backend; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.util.*; + + +@Slf4j +public class TestListener +{ + static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; + + + public Flux run() + { + return Flux + .fromArray(chatRooms) + .flatMap(chatRoom -> + { + List list = new LinkedList<>(); + receivedMessages.put(chatRoom.getId(), list); + return receiveMessages(chatRoom); + }); + } + + Flux receiveMessages(ChatRoomInfoTo chatRoom) + { + log.info("Requesting messages for chat-room {}", chatRoom); + List list = receivedMessages.get(chatRoom.getId()); + return receiveServerSentEvents(chatRoom) + .flatMap(sse -> + { + try + { + return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class)); + } + catch (Exception e) + { + return Mono.error(e); + } + }) + .doOnNext(message -> list.add(message)) + .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom)) + .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe)) + .thenMany(Flux.defer(() -> receiveMessages(chatRoom))); + } + + Flux> receiveServerSentEvents(ChatRoomInfoTo chatRoom) + { + return webClient + .get() + .uri( + "/{chatRoomId}/listen", + chatRoom.getId()) + .accept(MediaType.TEXT_EVENT_STREAM) + .header("X-Shard", chatRoom.getShard().toString()) + .retrieve() + .bodyToFlux(SSE_TYPE) + .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1))); + } + + + private final WebClient webClient; + private final ChatRoomInfoTo[] chatRooms; + private final ObjectMapper objectMapper; + + final Map> receivedMessages = new HashMap<>(); + + + TestListener(Integer port, ChatRoomInfoTo[] chatRooms) + { + webClient = WebClient.create("http://localhost:" + port); + this.chatRooms = chatRooms; + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/TestWriter.java b/src/test/java/de/juplo/kafka/chat/backend/api/TestWriter.java new file mode 100644 index 00000000..4220d002 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/api/TestWriter.java @@ -0,0 +1,129 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.nio.charset.Charset; +import java.time.Duration; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + + +@ToString(of = "user") +@Slf4j +public class TestWriter +{ + public Mono run() + { + return Flux + .fromIterable((Iterable) () -> new Iterator<>() + { + private int i = 0; + + @Override + public boolean hasNext() + { + return running; + } + + @Override + public Integer next() + { + return i++; + } + }) + .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500))) + .map(i -> "Message #" + i) + .concatMap(message -> sendMessage(chatRoom, message) + .doOnError(throwable -> + { + WebClientResponseException e = (WebClientResponseException)throwable; + log.info( + "Could not sent message {} for {}: {}", + message, + user.getName(), + e.getResponseBodyAsString()); + }) + .retryWhen(Retry.fixedDelay(60, Duration.ofSeconds(1)))) + .doOnNext(message -> + { + numSentMessages++; + sentMessages.add(message); + log.info( + "{} sent a message to {}: {}", + user, + chatRoom, + message); + }) + .doOnError(throwable -> + { + WebClientResponseException e = (WebClientResponseException)throwable.getCause(); + log.error( + "{} failed sending a message: {}", + user, + e.getResponseBodyAsString(Charset.defaultCharset())); + }) + .takeUntil(message -> !running) + .doOnComplete(() -> log.info("TestWriter {} is done", user)) + .then(); + } + + private Mono sendMessage( + ChatRoomInfoTo chatRoom, + String message) + { + return webClient + .put() + .uri( + "/{chatRoomId}/{username}/{serial}", + chatRoom.getId(), + user.getName(), + user.nextSerial()) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .header("X-Shard", chatRoom.getShard().toString()) + .bodyValue(message) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(MessageTo.class); + } + else + { + return response.createError(); + } + }); + } + + + private final WebClient webClient; + + final ChatRoomInfoTo chatRoom; + final User user; + final List sentMessages = new LinkedList<>(); + + volatile boolean running = true; + @Getter + private volatile int numSentMessages = 0; + + + TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username) + { + webClient = WebClient.create("http://localhost:" + port); + this.chatRoom = chatRoom; + user = new User(username); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/User.java b/src/test/java/de/juplo/kafka/chat/backend/api/User.java new file mode 100644 index 00000000..bcd97b46 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/api/User.java @@ -0,0 +1,27 @@ +package de.juplo.kafka.chat.backend; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + + +@EqualsAndHashCode(of = "name") +@ToString(of = "name") +class User +{ + @Getter + private final String name; + private int serial = 0; + + + User (String name) + { + this.name = name; + } + + + int nextSerial() + { + return ++serial; + } +}