test: HandoverIT-POC: Adapted and reanabled the test
authorKai Moritz <kai@juplo.de>
Wed, 20 Mar 2024 18:24:34 +0000 (19:24 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 22 Mar 2024 16:39:20 +0000 (17:39 +0100)
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java
src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java

index e0f12ef..86875c5 100644 (file)
@@ -5,7 +5,6 @@ import de.juplo.kafka.chat.backend.api.MessageTo;
 import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -37,7 +36,6 @@ public abstract class AbstractHandoverIT
   }
 
 
-  @Disabled
   @Test
   void test() throws InterruptedException
   {
index 57e8fb9..0bf7080 100644 (file)
@@ -1,30 +1,31 @@
 package de.juplo.kafka.chat.backend;
 
+import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo;
+import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo;
 import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.HttpStatusCode;
+import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.client.WebClient;
-import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.images.ImagePullPolicy;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Arrays;
 
+import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.*;
+
 
 @Slf4j
 public abstract class AbstractHandoverITContainers
@@ -35,7 +36,6 @@ public abstract class AbstractHandoverITContainers
   final Network network = Network.newNetwork();
   final GenericContainer haproxy, backend1, backend2, backend3;
   final SocketAddress haproxyAddress;
-  final String map = "/usr/local/etc/haproxy/sharding.map";
 
 
   AbstractHandoverITContainers()
@@ -169,7 +169,7 @@ public abstract class AbstractHandoverITContainers
             if (sentTotal == numSentMessages[i])
             {
               log.info(
-                  "No progress for {}: sent-before={}, sent-total={}, map:\n{}\n",
+                  "No progress for {}: sent-before={}, sent-total={}, map: {}",
                   testWriter,
                   numSentMessages[i],
                   sentTotal,
@@ -189,17 +189,19 @@ public abstract class AbstractHandoverITContainers
   final GenericContainer createHaproxyContainer()
   {
     return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
+        .withCommand("-f", "/etc/haproxy")
         .withNetwork(network)
         .withNetworkAliases("haproxy")
-        .withClasspathResourceMapping(
-            "haproxy.cfg",
-            "/usr/local/etc/haproxy/haproxy.cfg",
-            BindMode.READ_ONLY)
-        .withClasspathResourceMapping(
-            "sharding.map",
-            "/usr/local/etc/haproxy/sharding.map",
-            BindMode.READ_WRITE)
-        .withExposedPorts(8400, 8401, 8404)
+        .withCopyFileToContainer(
+            MountableFile.forClasspathResource("haproxy.cfg"),
+            "/etc/haproxy/haproxy.cfg")
+        .withCopyFileToContainer(
+            MountableFile.forClasspathResource("dataplaneapi.yml"),
+            "/etc/haproxy/dataplaneapi.yml")
+        .withCopyFileToContainer(
+            MountableFile.forClasspathResource("sharding.map"),
+            "/etc/haproxy/maps/sharding.map")
+        .withExposedPorts(8400, 8401, 8404, 5555)
         .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
   }
 
@@ -223,27 +225,37 @@ public abstract class AbstractHandoverITContainers
 
   private String readHaproxyMap()
   {
-    try(SocketChannel socketChannel = SocketChannel.open(haproxyAddress))
-    {
-      String command = "show map " + map + "\n";
-      byte[] commandBytes = command.getBytes();
-      ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
-      socketChannel.write(buffer);
-
-      ByteBuffer byteBuffer = ByteBuffer.allocate(512);
-      Charset charset = Charset.forName("UTF-8");
-      StringBuilder builder = new StringBuilder();
-      while (socketChannel.read(byteBuffer) > 0) {
-        byteBuffer.rewind();
-        builder.append(charset.decode(byteBuffer));
-        byteBuffer.flip();
-      }
-
-      return builder.toString();
-    }
-    catch(IOException e)
-    {
-      return e.toString();
-    }
+    return createHaproxyWebClient()
+        .get()
+        .uri(uriBuilder -> uriBuilder
+            .path("/services/haproxy/runtime/maps_entries")
+            .queryParam(MAP_PARAM, MAP_NAME)
+            .build())
+        .accept(MediaType.APPLICATION_JSON)
+        .exchangeToFlux(response ->
+        {
+          if (response.statusCode().equals(HttpStatus.OK))
+          {
+            return response.bodyToFlux(MapEntryTo.class);
+          }
+          else
+          {
+            return response.<MapEntryTo>createError().flux();
+          }
+        })
+        .map(entry -> entry.key() + "=" + entry.value())
+        .reduce((a, b) -> a + ", " + b)
+        .block();
+  }
+
+  private WebClient createHaproxyWebClient()
+  {
+    return WebClient
+        .builder()
+        .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/")
+        .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
+        .build();
   }
+
+  static final String MAP_NAME = "sharding";
 }
index 0b7f269..0b670fc 100644 (file)
@@ -71,8 +71,10 @@ class KafkaHandoverITContainers extends AbstractHandoverITContainers
         "--chat.backend.kafka.instance-uri=http://backend-ID:8080",
         "--chat.backend.kafka.num-partitions=10",
         "--chat.backend.kafka.client-id-prefix=BID",
-        "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
-        "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
+        "--chat.backend.kafka.haproxy-data-plane-api=http://haproxy:5555/v2/",
+        "--chat.backend.kafka.haproxy-user=juplo",
+        "--chat.backend.kafka.haproxy-password=juplo",
+        "--chat.backend.kafka.haproxy-map=sharding",
     };
   }
 }