package de.juplo.kafka.chat.backend;
+import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo;
+import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
+import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
-import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.ImagePullPolicy;
import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
+import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.*;
+
@Slf4j
public abstract class AbstractHandoverITContainers
final Network network = Network.newNetwork();
final GenericContainer haproxy, backend1, backend2, backend3;
final SocketAddress haproxyAddress;
- final String map = "/usr/local/etc/haproxy/sharding.map";
AbstractHandoverITContainers()
if (sentTotal == numSentMessages[i])
{
log.info(
- "No progress for {}: sent-before={}, sent-total={}, map:\n{}\n",
+ "No progress for {}: sent-before={}, sent-total={}, map: {}",
testWriter,
numSentMessages[i],
sentTotal,
final GenericContainer createHaproxyContainer()
{
return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
+ .withCommand("-f", "/etc/haproxy")
.withNetwork(network)
.withNetworkAliases("haproxy")
- .withClasspathResourceMapping(
- "haproxy.cfg",
- "/usr/local/etc/haproxy/haproxy.cfg",
- BindMode.READ_ONLY)
- .withClasspathResourceMapping(
- "sharding.map",
- "/usr/local/etc/haproxy/sharding.map",
- BindMode.READ_WRITE)
- .withExposedPorts(8400, 8401, 8404)
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("haproxy.cfg"),
+ "/etc/haproxy/haproxy.cfg")
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("dataplaneapi.yml"),
+ "/etc/haproxy/dataplaneapi.yml")
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("sharding.map"),
+ "/etc/haproxy/maps/sharding.map")
+ .withExposedPorts(8400, 8401, 8404, 5555)
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
}
private String readHaproxyMap()
{
- try(SocketChannel socketChannel = SocketChannel.open(haproxyAddress))
- {
- String command = "show map " + map + "\n";
- byte[] commandBytes = command.getBytes();
- ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
- socketChannel.write(buffer);
-
- ByteBuffer byteBuffer = ByteBuffer.allocate(512);
- Charset charset = Charset.forName("UTF-8");
- StringBuilder builder = new StringBuilder();
- while (socketChannel.read(byteBuffer) > 0) {
- byteBuffer.rewind();
- builder.append(charset.decode(byteBuffer));
- byteBuffer.flip();
- }
-
- return builder.toString();
- }
- catch(IOException e)
- {
- return e.toString();
- }
+ return createHaproxyWebClient()
+ .get()
+ .uri(uriBuilder -> uriBuilder
+ .path("/services/haproxy/runtime/maps_entries")
+ .queryParam(MAP_PARAM, MAP_NAME)
+ .build())
+ .accept(MediaType.APPLICATION_JSON)
+ .exchangeToFlux(response ->
+ {
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response.bodyToFlux(MapEntryTo.class);
+ }
+ else
+ {
+ return response.<MapEntryTo>createError().flux();
+ }
+ })
+ .map(entry -> entry.key() + "=" + entry.value())
+ .reduce((a, b) -> a + ", " + b)
+ .block();
+ }
+
+ private WebClient createHaproxyWebClient()
+ {
+ return WebClient
+ .builder()
+ .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/")
+ .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
+ .build();
}
+
+ static final String MAP_NAME = "sharding";
}