9235a062301035500b38dd1d9eb10d5ee4db3880
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.implementation.haproxy;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.junit.jupiter.api.BeforeEach;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.MediaType;
8 import org.springframework.web.reactive.function.client.WebClient;
9 import org.testcontainers.containers.GenericContainer;
10 import org.testcontainers.containers.Network;
11 import org.testcontainers.containers.output.Slf4jLogConsumer;
12 import org.testcontainers.junit.jupiter.Container;
13 import org.testcontainers.junit.jupiter.Testcontainers;
14 import org.testcontainers.utility.DockerImageName;
15 import org.testcontainers.utility.MountableFile;
16 import reactor.core.publisher.Mono;
17
18 import java.time.Duration;
19
20 import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_ENTRY_PATH;
21 import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM;
22 import static org.assertj.core.api.Assertions.assertThat;
23
24
25 @Testcontainers
26 @Slf4j
27 public class HaproxyDataPlaneApiShardingPublisherStrategyIT
28 {
29   @Test
30   void test()
31   {
32     Mono<String> result = shardingPublisherStrategy.publishOwnership(SHARD);
33
34     assertThat(result.block(Duration.ofSeconds(5)))
35         .isEqualTo(INSTANCE_ID);
36     assertThat(getMapEntryValueForKey(SHARD).block(Duration.ofSeconds(5)))
37         .isEqualTo(INSTANCE_ID);
38   }
39
40
41   private Mono<String> getMapEntryValueForKey(int key)
42   {
43     return webClient
44         .get()
45         .uri(uriBuilder -> uriBuilder
46             .path(MAP_ENTRY_PATH)
47             .queryParam(MAP_PARAM, "#" + shardingPublisherStrategy.getMapId())
48             .build(SHARD))
49         .accept(MediaType.APPLICATION_JSON)
50         .exchangeToMono(response ->
51         {
52           if (response.statusCode().equals(HttpStatus.OK))
53           {
54             return response.bodyToMono(MapEntryTo.class);
55           }
56           else
57           {
58             return response.createError();
59           }
60         })
61         .map(entry -> entry.value());
62   }
63
64
65   WebClient webClient;
66   HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy;
67
68
69   @BeforeEach
70   void setUpWebClient()
71   {
72     webClient = WebClient
73         .builder()
74         .baseUrl("http://localhost:" + HAPROXY.getMappedPort(5555) + "/v2/")
75         .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
76         .build();
77
78     shardingPublisherStrategy = new HaproxyDataPlaneApiShardingPublisherStrategy(
79         webClient,
80         MAP_PATH,
81         INSTANCE_ID);
82   }
83
84
85   static final String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
86   static final String INSTANCE_ID = "foo";
87   static final int SHARD = 6;
88
89   @Container
90   static final GenericContainer HAPROXY =
91       new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
92           .withNetwork(Network.newNetwork())
93           .withNetworkAliases("haproxy")
94           .withCopyFileToContainer(
95               MountableFile.forClasspathResource("haproxy.cfg"),
96               "/usr/local/etc/haproxy/haproxy.cfg")
97           .withCopyFileToContainer(
98               MountableFile.forClasspathResource("dataplaneapi.yml"),
99               "/usr/local/etc/haproxy/dataplaneapi.yml")
100           .withCopyFileToContainer(
101               MountableFile.forClasspathResource("sharding.map"),
102               "/usr/local/etc/haproxy/sharding.map")
103           .withExposedPorts(8400, 8401, 8404, 5555)
104           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
105 }