From: Kai Moritz Date: Wed, 20 Mar 2024 18:24:34 +0000 (+0100) Subject: test: HandoverIT-POC: Adapted and reanabled the test X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=af023bcca9b2b893342120e99618a7e739791c9f;p=demos%2Fkafka%2Fchat test: HandoverIT-POC: Adapted and reanabled the test --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java index e0f12ef8..86875c59 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java @@ -5,7 +5,6 @@ import de.juplo.kafka.chat.backend.api.MessageTo; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -37,7 +36,6 @@ public abstract class AbstractHandoverIT } - @Disabled @Test void test() throws InterruptedException { diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java index 57e8fb95..0bf7080d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java @@ -1,30 +1,31 @@ package de.juplo.kafka.chat.backend; +import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo; +import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatusCode; +import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; -import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.ImagePullPolicy; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.*; + @Slf4j public abstract class AbstractHandoverITContainers @@ -35,7 +36,6 @@ public abstract class AbstractHandoverITContainers final Network network = Network.newNetwork(); final GenericContainer haproxy, backend1, backend2, backend3; final SocketAddress haproxyAddress; - final String map = "/usr/local/etc/haproxy/sharding.map"; AbstractHandoverITContainers() @@ -169,7 +169,7 @@ public abstract class AbstractHandoverITContainers if (sentTotal == numSentMessages[i]) { log.info( - "No progress for {}: sent-before={}, sent-total={}, map:\n{}\n", + "No progress for {}: sent-before={}, sent-total={}, map: {}", testWriter, numSentMessages[i], sentTotal, @@ -189,17 +189,19 @@ public abstract class AbstractHandoverITContainers final GenericContainer createHaproxyContainer() { return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) + .withCommand("-f", "/etc/haproxy") .withNetwork(network) .withNetworkAliases("haproxy") - .withClasspathResourceMapping( - "haproxy.cfg", - "/usr/local/etc/haproxy/haproxy.cfg", - BindMode.READ_ONLY) - .withClasspathResourceMapping( - "sharding.map", - "/usr/local/etc/haproxy/sharding.map", - BindMode.READ_WRITE) - .withExposedPorts(8400, 8401, 8404) + .withCopyFileToContainer( + MountableFile.forClasspathResource("haproxy.cfg"), + "/etc/haproxy/haproxy.cfg") + .withCopyFileToContainer( + MountableFile.forClasspathResource("dataplaneapi.yml"), + "/etc/haproxy/dataplaneapi.yml") + .withCopyFileToContainer( + MountableFile.forClasspathResource("sharding.map"), + "/etc/haproxy/maps/sharding.map") + .withExposedPorts(8400, 8401, 8404, 5555) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); } @@ -223,27 +225,37 @@ public abstract class AbstractHandoverITContainers private String readHaproxyMap() { - try(SocketChannel socketChannel = SocketChannel.open(haproxyAddress)) - { - String command = "show map " + map + "\n"; - byte[] commandBytes = command.getBytes(); - ByteBuffer buffer = ByteBuffer.wrap(commandBytes); - socketChannel.write(buffer); - - ByteBuffer byteBuffer = ByteBuffer.allocate(512); - Charset charset = Charset.forName("UTF-8"); - StringBuilder builder = new StringBuilder(); - while (socketChannel.read(byteBuffer) > 0) { - byteBuffer.rewind(); - builder.append(charset.decode(byteBuffer)); - byteBuffer.flip(); - } - - return builder.toString(); - } - catch(IOException e) - { - return e.toString(); - } + return createHaproxyWebClient() + .get() + .uri(uriBuilder -> uriBuilder + .path("/services/haproxy/runtime/maps_entries") + .queryParam(MAP_PARAM, MAP_NAME) + .build()) + .accept(MediaType.APPLICATION_JSON) + .exchangeToFlux(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToFlux(MapEntryTo.class); + } + else + { + return response.createError().flux(); + } + }) + .map(entry -> entry.key() + "=" + entry.value()) + .reduce((a, b) -> a + ", " + b) + .block(); + } + + private WebClient createHaproxyWebClient() + { + return WebClient + .builder() + .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/") + .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo")) + .build(); } + + static final String MAP_NAME = "sharding"; } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java index 0b7f269b..0b670fc5 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java @@ -71,8 +71,10 @@ class KafkaHandoverITContainers extends AbstractHandoverITContainers "--chat.backend.kafka.instance-uri=http://backend-ID:8080", "--chat.backend.kafka.num-partitions=10", "--chat.backend.kafka.client-id-prefix=BID", - "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401", - "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map" + "--chat.backend.kafka.haproxy-data-plane-api=http://haproxy:5555/v2/", + "--chat.backend.kafka.haproxy-user=juplo", + "--chat.backend.kafka.haproxy-password=juplo", + "--chat.backend.kafka.haproxy-map=sharding", }; } }