1 package de.juplo.kafka.chat.backend.implementation.haproxy;
3 import lombok.extern.slf4j.Slf4j;
4 import org.junit.jupiter.api.BeforeEach;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.reactive.function.client.WebClient;
9 import org.testcontainers.containers.GenericContainer;
10 import org.testcontainers.containers.Network;
11 import org.testcontainers.containers.output.Slf4jLogConsumer;
12 import org.testcontainers.containers.wait.strategy.Wait;
13 import org.testcontainers.junit.jupiter.Container;
14 import org.testcontainers.junit.jupiter.Testcontainers;
15 import org.testcontainers.utility.DockerImageName;
16 import org.testcontainers.utility.MountableFile;
17 import reactor.core.publisher.Mono;
18 import reactor.util.retry.Retry;
20 import java.time.Duration;
22 import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_ENTRY_PATH;
23 import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM;
24 import static org.assertj.core.api.Assertions.assertThat;
29 public class HaproxyDataPlaneApiShardingPublisherStrategyIT
32 void test() throws InterruptedException
34 Mono<String> result = shardingPublisherStrategy.publishOwnership(SHARD);
36 assertThat(result.block(Duration.ofSeconds(5)))
37 .isEqualTo(INSTANCE_ID);
38 assertThat(getMapEntryValueForKey(SHARD).block(Duration.ofSeconds(5)))
39 .isEqualTo(INSTANCE_ID);
43 .killContainerCmd(HAPROXY.getContainerId())
47 Thread.sleep(1000); // << No clue, how to detect that the reload is complete
49 assertThat(getMapEntryValueForKey(SHARD).block(Duration.ofSeconds(5)))
50 .isEqualTo(INSTANCE_ID);
54 private Mono<String> getMapEntryValueForKey(int key)
58 .uri(uriBuilder -> uriBuilder
60 .queryParam(MAP_PARAM, "#" + shardingPublisherStrategy.getMapId())
62 .accept(MediaType.APPLICATION_JSON)
63 .exchangeToMono(response ->
65 if (response.statusCode().equals(HttpStatus.OK))
67 return response.bodyToMono(MapEntryTo.class);
71 return response.createError();
74 .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)))
75 .map(entry -> entry.value());
80 HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy;
88 .baseUrl("http://localhost:" + HAPROXY.getMappedPort(5555) + "/v2/")
89 .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
92 shardingPublisherStrategy = new HaproxyDataPlaneApiShardingPublisherStrategy(
99 static final String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
100 static final String INSTANCE_ID = "foo";
101 static final int SHARD = 6;
104 static final GenericContainer HAPROXY =
105 new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
106 .withNetwork(Network.newNetwork())
107 .withNetworkAliases("haproxy")
108 .withCopyFileToContainer(
109 MountableFile.forClasspathResource("haproxy.cfg"),
110 "/usr/local/etc/haproxy/haproxy.cfg")
111 .withCopyFileToContainer(
112 MountableFile.forClasspathResource("dataplaneapi.yml"),
113 "/usr/local/etc/haproxy/dataplaneapi.yml")
114 .withCopyFileToContainer(
115 MountableFile.forClasspathResource("sharding.map"),
116 "/usr/local/etc/haproxy/sharding.map")
117 .withExposedPorts(8400, 8401, 8404, 5555)
118 .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));