87f707e58164ee20f3295c9153f947bf605eb8ad
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.implementation.haproxy;
2
3 import com.adelean.inject.resources.junit.jupiter.GivenTextResource;
4 import com.adelean.inject.resources.junit.jupiter.TestWithResources;
5 import okhttp3.mockwebserver.MockResponse;
6 import okhttp3.mockwebserver.MockWebServer;
7 import okhttp3.mockwebserver.RecordedRequest;
8 import org.junit.jupiter.api.AfterEach;
9 import org.junit.jupiter.api.BeforeEach;
10 import org.junit.jupiter.api.DisplayName;
11 import org.junit.jupiter.api.Test;
12 import org.springframework.web.reactive.function.client.WebClient;
13 import reactor.core.publisher.Mono;
14
15 import java.io.IOException;
16 import java.time.Duration;
17 import java.util.concurrent.TimeUnit;
18
19 import static org.assertj.core.api.Assertions.assertThat;
20
21
22 @TestWithResources
23 public class HaproxyDataPlaneApiShardingPublisherStrategyTest
24 {
25   final static String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
26   final static String INSTANCE_ID = "backend_3";
27   final static int SHARD = 4;
28
29
30   MockWebServer mockHaproxy;
31   WebClient webClient;
32
33   @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json")
34   String maps__get;
35   @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps_entries__4__put.json")
36   String maps_entries__4__put;
37   @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps_entries__4__put__error.json")
38   String maps_entries__4__put__error;
39
40
41   @BeforeEach
42   void setUp() throws IOException
43   {
44     mockHaproxy = new MockWebServer();
45     mockHaproxy.start();
46     webClient = WebClient
47         .builder()
48         .baseUrl(String.format("http://localhost:%s/v2", mockHaproxy.getPort()))
49         .build();
50   }
51
52   @AfterEach
53   void tearDown() throws IOException
54   {
55     mockHaproxy.shutdown();
56   }
57
58
59   @DisplayName("Requests the available maps from HAProxy via the expected path on instanciation")
60   @Test
61   void testRequestsMapsFromHaproxyViaTheExpectedPathOnInstanciation() throws InterruptedException
62   {
63     // Given
64     mockHaproxy.enqueue(new MockResponse()
65         .setStatus("HTTP/1.1 200 OK")
66         .setBody(maps__get)
67         .addHeader("Content-Type", "application/json"));
68
69     // When
70     HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
71         new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
72
73     // Then
74     RecordedRequest recordedRequest = mockHaproxy.takeRequest(1l, TimeUnit.SECONDS);
75     assertThat(recordedRequest.getPath())
76         .isEqualTo("/v2/services/haproxy/runtime/maps?include_unmanaged=true");
77   }
78
79   @DisplayName("Detects the expected map-ID on instanciation")
80   @Test
81   void testDetectsExpectedIdForMapOnInstanciation()
82   {
83     // Given
84     mockHaproxy.enqueue(new MockResponse()
85         .setStatus("HTTP/1.1 200 OK")
86         .setBody(maps__get)
87         .addHeader("Content-Type", "application/json"));
88
89     // When
90     HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
91         new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
92
93     // Then
94     assertThat(shardingPublisherStrategy.getMapId())
95         .isEqualTo(4);
96   }
97
98   @DisplayName("The expected result is yielded on successful publishing")
99   @Test
100   void testExpectedResultOnSuccessfulPublishing()
101   {
102     // Given
103     mockHaproxy.enqueue(new MockResponse()
104         .setStatus("HTTP/1.1 200 OK")
105         .setBody(maps__get)
106         .addHeader("Content-Type", "application/json"));
107     mockHaproxy.enqueue(new MockResponse()
108         .setStatus("HTTP/1.1 200 OK")
109         .setBody(maps_entries__4__put)
110         .addHeader("Content-Type", "application/json"));
111
112     // When
113     HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
114         new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
115     Mono<String> result = shardingPublisherStrategy.publishOwnership(SHARD);
116
117     // Then
118     assertThat(result.block(Duration.ofSeconds(1)))
119         .isEqualTo(INSTANCE_ID);
120   }
121
122   @DisplayName("The expected error is raised on failed publishing")
123   @Test
124   void testExpectedResultOnFailedPublishing()
125   {
126     // Given
127     mockHaproxy.enqueue(new MockResponse()
128         .setStatus("HTTP/1.1 200 OK")
129         .setBody(maps__get)
130         .addHeader("Content-Type", "application/json"));
131     mockHaproxy.enqueue(new MockResponse()
132         .setStatus("HTTP/1.1 400 Bad Request")
133         .setBody(maps_entries__4__put__error)
134         .addHeader("Content-Type", "application/json"));
135
136     // When
137     HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
138         new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
139     Mono<String> result = shardingPublisherStrategy
140         .publishOwnership(SHARD)
141         .onErrorResume(throwable -> Mono.just(throwable.getMessage()));
142
143     // Then
144     assertThat(result.block(Duration.ofSeconds(1)))
145         .isEqualTo("Evil Error -- BOOM!");
146   }
147 }