41ed2dead549dbaa2adeb7d6be1abed778f21742
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.implementation.haproxy;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.junit.jupiter.api.BeforeEach;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.reactive.function.client.WebClient;
9 import org.testcontainers.containers.GenericContainer;
10 import org.testcontainers.containers.Network;
11 import org.testcontainers.containers.output.Slf4jLogConsumer;
12 import org.testcontainers.containers.wait.strategy.Wait;
13 import org.testcontainers.junit.jupiter.Container;
14 import org.testcontainers.junit.jupiter.Testcontainers;
15 import org.testcontainers.utility.DockerImageName;
16 import org.testcontainers.utility.MountableFile;
17 import reactor.core.publisher.Mono;
18 import reactor.util.retry.Retry;
19
20 import java.time.Duration;
21
22 import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_ENTRY_PATH;
23 import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM;
24 import static org.assertj.core.api.Assertions.assertThat;
25
26
27 @Testcontainers
28 @Slf4j
29 public class HaproxyDataPlaneApiShardingPublisherStrategyIT
30 {
31   @Test
32   void test() throws InterruptedException
33   {
34     Mono<String> result = shardingPublisherStrategy.publishOwnership(SHARD);
35
36     assertThat(result.block(Duration.ofSeconds(5)))
37         .isEqualTo(INSTANCE_ID);
38     assertThat(getMapEntryValueForKey(SHARD).block(Duration.ofSeconds(5)))
39         .isEqualTo(INSTANCE_ID);
40
41     HAPROXY
42         .getDockerClient()
43         .killContainerCmd(HAPROXY.getContainerId())
44         .withSignal("HUP")
45         .exec();
46
47     Thread.sleep(1000); // << No clue, how to detect that the reload is complete
48
49     assertThat(getMapEntryValueForKey(SHARD).block(Duration.ofSeconds(5)))
50         .isEqualTo(INSTANCE_ID);
51   }
52
53
54   private Mono<String> getMapEntryValueForKey(int key)
55   {
56     return webClient
57         .get()
58         .uri(uriBuilder -> uriBuilder
59             .path(MAP_ENTRY_PATH)
60             .queryParam(MAP_PARAM, "#" + shardingPublisherStrategy.getMapId())
61             .build(SHARD))
62         .accept(MediaType.APPLICATION_JSON)
63         .exchangeToMono(response ->
64         {
65           if (response.statusCode().equals(HttpStatus.OK))
66           {
67             return response.bodyToMono(MapEntryTo.class);
68           }
69           else
70           {
71             return response.createError();
72           }
73         })
74         .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)))
75         .map(entry -> entry.value());
76   }
77
78
79   WebClient webClient;
80   HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy;
81
82
83   @BeforeEach
84   void setUpWebClient()
85   {
86     webClient = WebClient
87         .builder()
88         .baseUrl("http://localhost:" + HAPROXY.getMappedPort(5555) + "/v2/")
89         .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
90         .build();
91
92     shardingPublisherStrategy = new HaproxyDataPlaneApiShardingPublisherStrategy(
93         webClient,
94         MAP_PATH,
95         INSTANCE_ID);
96   }
97
98
99   static final String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
100   static final String INSTANCE_ID = "foo";
101   static final int SHARD = 6;
102
103   @Container
104   static final GenericContainer HAPROXY =
105       new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
106           .withNetwork(Network.newNetwork())
107           .withNetworkAliases("haproxy")
108           .withCopyFileToContainer(
109               MountableFile.forClasspathResource("haproxy.cfg"),
110               "/usr/local/etc/haproxy/haproxy.cfg")
111           .withCopyFileToContainer(
112               MountableFile.forClasspathResource("dataplaneapi.yml"),
113               "/usr/local/etc/haproxy/dataplaneapi.yml")
114           .withCopyFileToContainer(
115               MountableFile.forClasspathResource("sharding.map"),
116               "/usr/local/etc/haproxy/sharding.map")
117           .withExposedPorts(8400, 8401, 8404, 5555)
118           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
119 }