WIP:poc-setup
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaHandoverIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
6 import lombok.Getter;
7 import lombok.Setter;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.awaitility.Awaitility;
11 import org.junit.jupiter.api.BeforeAll;
12 import org.junit.jupiter.api.BeforeEach;
13 import org.junit.jupiter.api.Test;
14 import org.springframework.core.ParameterizedTypeReference;
15 import org.springframework.http.HttpStatus;
16 import org.springframework.http.MediaType;
17 import org.springframework.http.codec.ServerSentEvent;
18 import org.springframework.web.reactive.function.client.WebClient;
19 import org.testcontainers.containers.*;
20 import org.testcontainers.containers.output.Slf4jLogConsumer;
21 import org.testcontainers.containers.wait.strategy.Wait;
22 import org.testcontainers.utility.DockerImageName;
23 import reactor.core.publisher.Flux;
24 import reactor.core.publisher.Mono;
25 import reactor.util.retry.Retry;
26
27 import java.io.IOException;
28 import java.time.Duration;
29 import java.util.stream.IntStream;
30
31
32 @Slf4j
33 class KafkaHandoverIT extends AbstractHandoverIT
34 {
35   @Test
36   void test() throws InterruptedException
37   {
38     ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
39     User user = new User("nerd");
40     IntStream
41         .rangeClosed(1,100)
42         .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
43         .map(result -> result
44             .map(MessageTo::toString)
45             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
46             .block())
47         .forEach(result -> log.info("{}", result));
48
49     Long count = receiveMessages(chatRoom)
50         .doOnNext(message -> log.info("message: {}", message))
51         .count()
52         .block();
53     log.info("Received {} messages", count);
54   }
55
56   Mono<ChatRoomInfoTo> createChatRoom(String name)
57   {
58     return webClient
59         .post()
60         .uri("/create")
61         .contentType(MediaType.TEXT_PLAIN)
62         .bodyValue(name)
63         .accept(MediaType.APPLICATION_JSON)
64         .exchangeToMono(response ->
65         {
66           if (response.statusCode().equals(HttpStatus.OK))
67           {
68             return response.bodyToMono(ChatRoomInfoTo.class);
69           }
70           else
71           {
72             return response.createError();
73           }
74         });
75   }
76
77   Mono<MessageTo> sendMessage(
78       ChatRoomInfoTo chatRoom,
79       User user,
80       String message)
81   {
82     return webClient
83         .put()
84         .uri(
85             "/{chatRoomId}/{username}/{serial}",
86             chatRoom.getId(),
87             user.getName(),
88             user.nextSerial())
89         .contentType(MediaType.TEXT_PLAIN)
90         .accept(MediaType.APPLICATION_JSON)
91         .bodyValue(message)
92         .exchangeToMono(response ->
93         {
94           if (response.statusCode().equals(HttpStatus.OK))
95           {
96             return response.bodyToMono(MessageTo.class);
97           }
98           else
99           {
100             return response.createError();
101           }
102         });
103   }
104
105   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
106   {
107     return webClient
108         .get()
109         .uri(
110             "/{chatRoomId}/listen",
111             chatRoom.getId())
112         .accept(MediaType.TEXT_EVENT_STREAM)
113         .retrieve()
114         .bodyToFlux(sseType);
115   }
116
117   @BeforeEach
118   void setUpWebClient()
119   {
120     Integer port = HAPROXY.getMappedPort(8400);
121     webClient = WebClient.create("http://localhost:" + port);
122
123     Awaitility
124         .await()
125         .atMost(Duration.ofMinutes(10))
126         .until(() -> WebClient
127             .create("http://localhost:" + BACKEND_1.getMappedPort(8080))
128             .get()
129             .uri("/actuator/health")
130             .exchangeToMono(response ->
131             {
132               if (response.statusCode().equals(HttpStatus.OK))
133               {
134                 return response
135                     .bodyToMono(StatusTo.class)
136                     .map(StatusTo::getStatus)
137                     .map(status -> status.equalsIgnoreCase("UP"));
138               }
139               else
140               {
141                 return Mono.just(false);
142               }
143             })
144             .block());
145
146     HAPROXY
147         .getDockerClient()
148         .killContainerCmd(HAPROXY.getContainerId())
149         .withSignal("HUP")
150         .exec();
151
152
153     Awaitility
154         .await()
155         .atMost(Duration.ofMinutes(10))
156         .until(() -> webClient
157             .get()
158             .uri("/actuator/health")
159             .exchangeToMono(response ->
160             {
161               if (response.statusCode().equals(HttpStatus.OK))
162               {
163                 return response
164                     .bodyToMono(StatusTo.class)
165                     .map(StatusTo::getStatus)
166                     .map(status -> status.equalsIgnoreCase("UP"));
167               }
168               else
169               {
170                 return Mono.just(false);
171               }
172             })
173             .block());
174   }
175
176   WebClient webClient;
177
178
179   @BeforeAll
180   static void setUpDocker() throws IOException, InterruptedException
181   {
182     KAFKA.start();
183     HAPROXY.start();
184
185     Container.ExecResult result;
186     result = KAFKA.execInContainer(
187         "kafka-topics",
188         "--bootstrap-server",
189         "kafka:9999",
190         "--create",
191         "--topic",
192         "info_channel",
193         "--partitions",
194         "3");
195     log.info(
196         "EXIT-CODE={}, STDOUT={}, STDERR={}",
197         result.getExitCode(),
198         result.getStdout(),
199         result.getStdout());
200     result = KAFKA.execInContainer(
201         "kafka-topics",
202         "--bootstrap-server",
203         "kafka:9999",
204         "--create",
205         "--topic",
206         "data_channel",
207         "--partitions",
208         "10");
209     log.info(
210         "EXIT-CODE={}, STDOUT={}, STDERR={}",
211         result.getExitCode(),
212         result.getStdout(),
213         result.getStdout());
214
215     BACKEND_1.start();
216     // BACKEND_2.start();
217     // BACKEND_3.start();
218   }
219
220   static Network NETWORK = Network.newNetwork();
221
222   static KafkaContainer KAFKA =
223       new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
224           .withNetwork(NETWORK)
225           .withNetworkAliases("kafka")
226           .withListener(() -> "kafka:9999")
227           .withKraft()
228           .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
229           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
230
231   static GenericContainer BACKEND_1 =
232       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
233           .withImagePullPolicy(NEVER_PULL)
234           .withNetwork(NETWORK)
235           .withNetworkAliases("backend-1")
236           .withCommand(
237               "--chat.backend.instance-id=backend-1",
238               "--chat.backend.services=kafka",
239               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
240               "--chat.backend.kafka.instance-uri=http://backend-1:8080",
241               "--chat.backend.kafka.num-partitions=10",
242               "--chat.backend.kafka.client-id-prefix=B1",
243               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
244               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
245           )
246           .withExposedPorts(8080)
247           .dependsOn(KAFKA)
248           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
249           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
250
251   static GenericContainer BACKEND_2 =
252       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
253           .withImagePullPolicy(NEVER_PULL)
254           .withNetwork(NETWORK)
255           .withNetworkAliases("backend-2")
256           .withCommand(
257               "--chat.backend.instance-id=backend-2",
258               "--chat.backend.services=kafka",
259               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
260               "--chat.backend.kafka.instance-uri=http://backend-2:8080",
261               "--chat.backend.kafka.num-partitions=10",
262               "--chat.backend.kafka.client-id-prefix=B2",
263               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
264               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
265           )
266           .withExposedPorts(8080)
267           .dependsOn(KAFKA)
268           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
269           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
270
271
272   static GenericContainer BACKEND_3 =
273       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
274           .withImagePullPolicy(NEVER_PULL)
275           .withNetwork(NETWORK)
276           .withNetworkAliases("backend-3")
277           .withCommand(
278               "--chat.backend.instance-id=backend-3",
279               "--chat.backend.services=kafka",
280               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
281               "--chat.backend.kafka.instance-uri=http://backend-3:8080",
282               "--chat.backend.kafka.num-partitions=10",
283               "--chat.backend.kafka.client-id-prefix=B3",
284               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
285               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
286           )
287           .withExposedPorts(8080)
288           .dependsOn(KAFKA)
289           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
290           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
291
292   static GenericContainer HAPROXY =
293       new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
294           .withNetwork(NETWORK)
295           .withNetworkAliases("haproxy")
296           .withClasspathResourceMapping(
297               "haproxy.cfg",
298               "/usr/local/etc/haproxy/haproxy.cfg",
299               BindMode.READ_ONLY)
300           .withClasspathResourceMapping(
301               "sharding.map",
302               "/usr/local/etc/haproxy/sharding.map",
303               BindMode.READ_WRITE)
304           .withExposedPorts(8400, 8401, 8404)
305           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
306
307   @EqualsAndHashCode
308   @ToString
309   class User
310   {
311     @Getter
312     private final String name;
313     private int serial = 0;
314
315
316     User (String name)
317     {
318       this.name = name;
319     }
320
321
322     int nextSerial()
323     {
324       return ++serial;
325     }
326   }
327
328   @Getter
329   @Setter
330   static class StatusTo
331   {
332     String status;
333   }
334
335   ParameterizedTypeReference<ServerSentEvent<String>> sseType = new ParameterizedTypeReference<>() {};
336 }