1 package de.juplo.kafka.chat.backend.implementation.haproxy;
3 import lombok.extern.slf4j.Slf4j;
4 import org.junit.jupiter.api.BeforeEach;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.reactive.function.client.WebClient;
9 import org.testcontainers.containers.GenericContainer;
10 import org.testcontainers.containers.Network;
11 import org.testcontainers.containers.output.Slf4jLogConsumer;
12 import org.testcontainers.junit.jupiter.Container;
13 import org.testcontainers.junit.jupiter.Testcontainers;
14 import org.testcontainers.utility.DockerImageName;
15 import org.testcontainers.utility.MountableFile;
16 import reactor.core.publisher.Mono;
18 import java.time.Duration;
20 import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_ENTRY_PATH;
21 import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM;
22 import static org.assertj.core.api.Assertions.assertThat;
27 public class HaproxyDataPlaneApiShardingPublisherStrategyIT
32 Mono<String> result = shardingPublisherStrategy.publishOwnership(SHARD);
34 assertThat(result.block(Duration.ofSeconds(5)))
35 .isEqualTo(INSTANCE_ID);
36 assertThat(getMapEntryValueForKey(SHARD).block(Duration.ofSeconds(5)))
37 .isEqualTo(INSTANCE_ID);
41 private Mono<String> getMapEntryValueForKey(int key)
45 .uri(uriBuilder -> uriBuilder
47 .queryParam(MAP_PARAM, "#" + shardingPublisherStrategy.getMapId())
49 .accept(MediaType.APPLICATION_JSON)
50 .exchangeToMono(response ->
52 if (response.statusCode().equals(HttpStatus.OK))
54 return response.bodyToMono(MapEntryTo.class);
58 return response.createError();
61 .map(entry -> entry.value());
66 HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy;
74 .baseUrl("http://localhost:" + HAPROXY.getMappedPort(5555) + "/v2/")
75 .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
78 shardingPublisherStrategy = new HaproxyDataPlaneApiShardingPublisherStrategy(
85 static final String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
86 static final String INSTANCE_ID = "foo";
87 static final int SHARD = 6;
90 static final GenericContainer HAPROXY =
91 new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
92 .withNetwork(Network.newNetwork())
93 .withNetworkAliases("haproxy")
94 .withCopyFileToContainer(
95 MountableFile.forClasspathResource("haproxy.cfg"),
96 "/usr/local/etc/haproxy/haproxy.cfg")
97 .withCopyFileToContainer(
98 MountableFile.forClasspathResource("dataplaneapi.yml"),
99 "/usr/local/etc/haproxy/dataplaneapi.yml")
100 .withCopyFileToContainer(
101 MountableFile.forClasspathResource("sharding.map"),
102 "/usr/local/etc/haproxy/sharding.map")
103 .withExposedPorts(8400, 8401, 8404, 5555)
104 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));