From: Kai Moritz Date: Fri, 23 Feb 2024 09:37:11 +0000 (+0100) Subject: WIP:neu X-Git-Tag: rebase--2024-02-26--19-46~26 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2c3e158b82ca13efab4d1b446caf76c9ab0f0b5c;p=demos%2Fkafka%2Fchat WIP:neu --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java index d2059624..3d24daf4 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java @@ -1,41 +1,95 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.implementation.kafka.ConsumerTaskRunner; -import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner; -import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.http.MediaType; +import org.springframework.test.web.reactive.server.WebTestClient; import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.images.ImagePullPolicy; import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; -import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC; -import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC; +import java.time.Duration; +import java.util.UUID; @Slf4j class KafkaHandoverIT extends AbstractHandoverIT { - Network network = Network.newNetwork(); + @Test + void test() throws InterruptedException + { + ObjectMapper objectMapper = new ObjectMapper(); + Awaitility + .await() + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> + { + byte[] result = webTestClient + .post() + .uri("/create") + .contentType(MediaType.TEXT_PLAIN) + .bodyValue("bar") + .accept(MediaType.APPLICATION_JSON) + .exchange() + .expectStatus().isOk() + .expectBody() + .jsonPath("$.id").exists() + .jsonPath("$.name").isEqualTo("bar") + // The hard must not be asserted, because not all implementations ar aware of it + // .jsonPath("$.shard").isEqualTo(Integer.valueOf(2)) + .returnResult() + .getResponseBody(); + ChatRoomInfoTo chatRoomInfo = objectMapper.readValue(result, ChatRoomInfoTo.class); + UUID chatRoomId = chatRoomInfo.getId(); + webTestClient + .put() + .uri( + "/{chatRoomId}/nerd/7", + chatRoomId) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .bodyValue("Hello world!") + .exchange() + .expectStatus().isOk() + .expectBody() + .jsonPath("$.id").isEqualTo(Integer.valueOf(7)) + .jsonPath("$.user").isEqualTo("nerd") + .jsonPath("$.text").isEqualTo("Hello world!"); + }); + } + + @BeforeEach + void setUp() + { + Integer port = haproxy.getMappedPort(8400); + webTestClient = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build(); + + Awaitility + .await() + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> webTestClient + .get() + .uri("/actuator/health") + .exchange() + .expectStatus().isOk() + .expectBody().jsonPath("$.status").isEqualTo("UP")); + } + + + WebTestClient webTestClient; + Network network = Network.newNetwork(); @Container - GenericContainer HAPROXY = + GenericContainer haproxy = new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) .withNetwork(network) .withNetworkAliases("haproxy") @@ -43,20 +97,25 @@ class KafkaHandoverIT extends AbstractHandoverIT "haproxy.cfg", "/usr/local/etc/haproxy/haproxy.cfg", BindMode.READ_ONLY) - .withExposedPorts(8400, 8401, 8404, 5555) + .withClasspathResourceMapping( + "sharding.map", + "/usr/local/etc/haproxy/sharding.map", + BindMode.READ_WRITE) + .withExposedPorts(8400) // , 8401, 8404, 5555) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); @Container - KafkaContainer KAFKA = + KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) .withNetwork(network) .withNetworkAliases("kafka") .withListener(() -> "kafka:9999") + .withEnv("KAFKA_NUM_PARTITIONS", "10") .withKraft() .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA")); @Container - GenericContainer BACKEND_1 = + GenericContainer backend_1 = new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) .withImagePullPolicy(NEVER_PULL) .withNetwork(network) @@ -74,7 +133,7 @@ class KafkaHandoverIT extends AbstractHandoverIT .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1")); @Container - GenericContainer BACKEND_2 = + GenericContainer backend_2 = new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) .withImagePullPolicy(NEVER_PULL) .withNetwork(network) @@ -92,7 +151,7 @@ class KafkaHandoverIT extends AbstractHandoverIT .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2")); @Container - GenericContainer BACKEND_3 = + GenericContainer backend_3 = new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT")) .withImagePullPolicy(NEVER_PULL) .withNetwork(network) @@ -108,16 +167,4 @@ class KafkaHandoverIT extends AbstractHandoverIT "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" ) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3")); - - - @BeforeAll - static void setUp() - { - } - - @Test - void test() throws InterruptedException - { - Thread.sleep(150000); - } }