From b29c52c64b970cb6452827923183053d6d4cdc44 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 24 Feb 2024 12:35:14 +0100 Subject: [PATCH] WIP:setup --- .../kafka/chat/backend/KafkaHandoverIT.java | 160 ++++++++++++------ 1 file changed, 112 insertions(+), 48 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java index 2f74b01f..0feed857 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java @@ -1,24 +1,29 @@ package de.juplo.kafka.chat.backend; -import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; -import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.web.reactive.function.client.WebClient; import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; -import java.time.Duration; -import java.util.UUID; +import java.util.stream.IntStream; @Slf4j @@ -27,57 +32,85 @@ class KafkaHandoverIT extends AbstractHandoverIT @Test void test() throws InterruptedException { - ObjectMapper objectMapper = new ObjectMapper(); - Awaitility - .await() - .atMost(Duration.ofSeconds(15)) - .untilAsserted(() -> + ChatRoomInfoTo chatRoom = createChatRoom("bar").block(); + User user = new User("nerd"); + IntStream + .rangeClosed(1,100) + .forEach(i ->sendMessage(chatRoom, user, "Message #" + i)); + + Thread.sleep(10000); + receiveMessage(chatRoom).subscribe(message -> log.info("message: {}", message)); + } + + Mono createChatRoom(String name) + { + return webClient + .post() + .uri("/create") + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(name) + .accept(MediaType.APPLICATION_JSON) + .exchangeToMono(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(ChatRoomInfoTo.class); + } + else + { + return response.createError(); + } + }); + } + + Mono sendMessage( + ChatRoomInfoTo chatRoom, + User user, + String message) + { + return webClient + .put() + .uri( + "/{chatRoomId}/{username}/{serial}", + chatRoom.getId(), + user.getName(), + user.nextSerial()) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .bodyValue(message) + .exchangeToMono(response -> { - byte[] result = webTestClient - .post() - .uri("/create") - .contentType(MediaType.TEXT_PLAIN) - .bodyValue("bar") - .accept(MediaType.APPLICATION_JSON) - .exchange() - .expectStatus().isOk() - .expectBody() - .jsonPath("$.id").exists() - .jsonPath("$.name").isEqualTo("bar") - // The hard must not be asserted, because not all implementations ar aware of it - // .jsonPath("$.shard").isEqualTo(Integer.valueOf(2)) - .returnResult() - .getResponseBody(); - ChatRoomInfoTo chatRoomInfo = objectMapper.readValue(result, ChatRoomInfoTo.class); - UUID chatRoomId = chatRoomInfo.getId(); - webTestClient - .put() - .uri( - "/{chatRoomId}/nerd/7", - chatRoomId) - .contentType(MediaType.TEXT_PLAIN) - .accept(MediaType.APPLICATION_JSON) - .bodyValue("Hello world!") - .exchange() - .expectStatus().isOk() - .expectBody() - .jsonPath("$.id").isEqualTo(Integer.valueOf(7)) - .jsonPath("$.user").isEqualTo("nerd") - .jsonPath("$.text").isEqualTo("Hello world!"); + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToMono(MessageTo.class); + } + else + { + return response.createError(); + } }); + } - Thread.sleep(30000); + Flux receiveMessage(ChatRoomInfoTo chatRoom) + { + return webClient + .get() + .uri( + "/{chatRoomId}", + chatRoom.getId()) + .accept(MediaType.APPLICATION_OCTET_STREAM) + .retrieve().bodyToFlux(byte[].class); } @BeforeEach void setUp() { Integer port = haproxy.getMappedPort(8400); - webTestClient = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build(); + webClient = WebClient.create("http://localhost:" + port); } - WebTestClient webTestClient; + WebClient webClient; Network network = Network.newNetwork(); @@ -87,11 +120,21 @@ class KafkaHandoverIT extends AbstractHandoverIT .withNetwork(network) .withNetworkAliases("kafka") .withListener(() -> "kafka:9999") - .withEnv("KAFKA_NUM_PARTITIONS", "10") .withKraft() .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); + @Container + GenericContainer setup = + new GenericContainer<>("confluentinc/cp-kcat:7.4.1") + .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("sh", "-c")) + .withNetwork(network) + .withCommand( + "kafka-topics --bootstrap-server kafka:9999 --create --topic info_channel --partitions 3", + "kafka-topics --bootstrap-server kafka:9999 --create --topic data_channel --partitions 10") + .dependsOn(kafka) + .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("SETUP")); + @Container GenericContainer backend_1 = new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) @@ -108,7 +151,7 @@ class KafkaHandoverIT extends AbstractHandoverIT "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" ) - .dependsOn(kafka) + .dependsOn(setup) .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); @@ -128,7 +171,7 @@ class KafkaHandoverIT extends AbstractHandoverIT "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" ) - .dependsOn(kafka) + .dependsOn(setup) .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); @@ -148,7 +191,7 @@ class KafkaHandoverIT extends AbstractHandoverIT "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" ) - .dependsOn(kafka) + .dependsOn(setup) .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1)) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); @@ -168,4 +211,25 @@ class KafkaHandoverIT extends AbstractHandoverIT .withExposedPorts(8400) // , 8401, 8404, 5555) .dependsOn(backend_1, backend_2, backend_3) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); + + @EqualsAndHashCode + @ToString + class User + { + @Getter + private final String name; + private int serial = 0; + + + User (String name) + { + this.name = name; + } + + + int nextSerial() + { + return ++serial; + } + } } -- 2.20.1