WIP:setup
authorKai Moritz <kai@juplo.de>
Sat, 24 Feb 2024 11:35:14 +0000 (12:35 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 24 Feb 2024 11:35:14 +0000 (12:35 +0100)
src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java

index 2f74b01..0feed85 100644 (file)
@@ -1,24 +1,29 @@
 package de.juplo.kafka.chat.backend;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
-import org.springframework.test.web.reactive.server.WebTestClient;
+import org.springframework.web.reactive.function.client.WebClient;
 import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
 import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testcontainers.utility.DockerImageName;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
-import java.time.Duration;
-import java.util.UUID;
+import java.util.stream.IntStream;
 
 
 @Slf4j
@@ -27,57 +32,85 @@ class KafkaHandoverIT extends AbstractHandoverIT
   @Test
   void test() throws InterruptedException
   {
-    ObjectMapper objectMapper = new ObjectMapper();
-    Awaitility
-        .await()
-        .atMost(Duration.ofSeconds(15))
-        .untilAsserted(() ->
+    ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
+    User user = new User("nerd");
+    IntStream
+        .rangeClosed(1,100)
+        .forEach(i ->sendMessage(chatRoom, user, "Message #" + i));
+
+    Thread.sleep(10000);
+    receiveMessage(chatRoom).subscribe(message -> log.info("message: {}", message));
+  }
+
+  Mono<ChatRoomInfoTo> createChatRoom(String name)
+  {
+    return webClient
+        .post()
+        .uri("/create")
+        .contentType(MediaType.TEXT_PLAIN)
+        .bodyValue(name)
+        .accept(MediaType.APPLICATION_JSON)
+        .exchangeToMono(response ->
+        {
+          if (response.statusCode().equals(HttpStatus.OK))
+          {
+            return response.bodyToMono(ChatRoomInfoTo.class);
+          }
+          else
+          {
+            return response.createError();
+          }
+        });
+  }
+
+  Mono<MessageTo> sendMessage(
+      ChatRoomInfoTo chatRoom,
+      User user,
+      String message)
+  {
+    return webClient
+        .put()
+        .uri(
+            "/{chatRoomId}/{username}/{serial}",
+            chatRoom.getId(),
+            user.getName(),
+            user.nextSerial())
+        .contentType(MediaType.TEXT_PLAIN)
+        .accept(MediaType.APPLICATION_JSON)
+        .bodyValue(message)
+        .exchangeToMono(response ->
         {
-          byte[] result = webTestClient
-              .post()
-              .uri("/create")
-              .contentType(MediaType.TEXT_PLAIN)
-              .bodyValue("bar")
-              .accept(MediaType.APPLICATION_JSON)
-              .exchange()
-              .expectStatus().isOk()
-              .expectBody()
-              .jsonPath("$.id").exists()
-              .jsonPath("$.name").isEqualTo("bar")
-              // The hard must not be asserted, because not all implementations ar aware of it
-              // .jsonPath("$.shard").isEqualTo(Integer.valueOf(2))
-              .returnResult()
-              .getResponseBody();
-          ChatRoomInfoTo chatRoomInfo = objectMapper.readValue(result, ChatRoomInfoTo.class);
-          UUID chatRoomId = chatRoomInfo.getId();
-          webTestClient
-              .put()
-              .uri(
-                  "/{chatRoomId}/nerd/7",
-                  chatRoomId)
-              .contentType(MediaType.TEXT_PLAIN)
-              .accept(MediaType.APPLICATION_JSON)
-              .bodyValue("Hello world!")
-              .exchange()
-              .expectStatus().isOk()
-              .expectBody()
-              .jsonPath("$.id").isEqualTo(Integer.valueOf(7))
-              .jsonPath("$.user").isEqualTo("nerd")
-              .jsonPath("$.text").isEqualTo("Hello world!");
+          if (response.statusCode().equals(HttpStatus.OK))
+          {
+            return response.bodyToMono(MessageTo.class);
+          }
+          else
+          {
+            return response.createError();
+          }
         });
+  }
 
-    Thread.sleep(30000);
+  Flux<byte[]> receiveMessage(ChatRoomInfoTo chatRoom)
+  {
+    return webClient
+        .get()
+        .uri(
+            "/{chatRoomId}",
+            chatRoom.getId())
+        .accept(MediaType.APPLICATION_OCTET_STREAM)
+        .retrieve().bodyToFlux(byte[].class);
   }
 
   @BeforeEach
   void setUp()
   {
     Integer port = haproxy.getMappedPort(8400);
-    webTestClient = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build();
+    webClient = WebClient.create("http://localhost:" + port);
   }
 
 
-  WebTestClient webTestClient;
+  WebClient webClient;
 
   Network network = Network.newNetwork();
 
@@ -87,11 +120,21 @@ class KafkaHandoverIT extends AbstractHandoverIT
           .withNetwork(network)
           .withNetworkAliases("kafka")
           .withListener(() -> "kafka:9999")
-          .withEnv("KAFKA_NUM_PARTITIONS", "10")
           .withKraft()
           .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
 
+  @Container
+  GenericContainer<?> setup =
+      new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
+          .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("sh", "-c"))
+      .withNetwork(network)
+      .withCommand(
+          "kafka-topics --bootstrap-server kafka:9999 --create --topic info_channel --partitions 3",
+          "kafka-topics --bootstrap-server kafka:9999 --create --topic data_channel --partitions 10")
+      .dependsOn(kafka)
+      .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("SETUP"));
+
   @Container
   GenericContainer backend_1 =
       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
@@ -108,7 +151,7 @@ class KafkaHandoverIT extends AbstractHandoverIT
               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
           )
-          .dependsOn(kafka)
+          .dependsOn(setup)
           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
 
@@ -128,7 +171,7 @@ class KafkaHandoverIT extends AbstractHandoverIT
               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
           )
-          .dependsOn(kafka)
+          .dependsOn(setup)
           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
 
@@ -148,7 +191,7 @@ class KafkaHandoverIT extends AbstractHandoverIT
               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
           )
-          .dependsOn(kafka)
+          .dependsOn(setup)
           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
 
@@ -168,4 +211,25 @@ class KafkaHandoverIT extends AbstractHandoverIT
           .withExposedPorts(8400) // , 8401, 8404, 5555)
           .dependsOn(backend_1, backend_2, backend_3)
           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
+
+  @EqualsAndHashCode
+  @ToString
+  class User
+  {
+    @Getter
+    private final String name;
+    private int serial = 0;
+
+
+    User (String name)
+    {
+      this.name = name;
+    }
+
+
+    int nextSerial()
+    {
+      return ++serial;
+    }
+  }
 }