test: HandoverIT-POC - first working setup for the planned test
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaHandoverIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.awaitility.Awaitility;
5 import org.junit.jupiter.api.BeforeAll;
6 import org.junit.jupiter.api.BeforeEach;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.web.reactive.function.client.WebClient;
9 import org.testcontainers.containers.*;
10 import org.testcontainers.containers.output.Slf4jLogConsumer;
11 import org.testcontainers.containers.wait.strategy.Wait;
12 import org.testcontainers.utility.DockerImageName;
13 import reactor.core.publisher.Mono;
14
15 import java.io.IOException;
16 import java.time.Duration;
17
18
19 @Slf4j
20 class KafkaHandoverIT extends AbstractHandoverIT
21 {
22   @BeforeEach
23   void setUpWebClient()
24   {
25     Integer port = HAPROXY.getMappedPort(8400);
26     webClient = WebClient.create("http://localhost:" + port);
27
28     Awaitility
29         .await()
30         .atMost(Duration.ofMinutes(10))
31         .until(() -> WebClient
32             .create("http://localhost:" + BACKEND_1.getMappedPort(8080))
33             .get()
34             .uri("/actuator/health")
35             .exchangeToMono(response ->
36             {
37               if (response.statusCode().equals(HttpStatus.OK))
38               {
39                 return response
40                     .bodyToMono(StatusTo.class)
41                     .map(StatusTo::getStatus)
42                     .map(status -> status.equalsIgnoreCase("UP"));
43               }
44               else
45               {
46                 return Mono.just(false);
47               }
48             })
49             .block());
50
51     HAPROXY
52         .getDockerClient()
53         .killContainerCmd(HAPROXY.getContainerId())
54         .withSignal("HUP")
55         .exec();
56
57
58     Awaitility
59         .await()
60         .atMost(Duration.ofMinutes(10))
61         .until(() -> webClient
62             .get()
63             .uri("/actuator/health")
64             .exchangeToMono(response ->
65             {
66               if (response.statusCode().equals(HttpStatus.OK))
67               {
68                 return response
69                     .bodyToMono(StatusTo.class)
70                     .map(StatusTo::getStatus)
71                     .map(status -> status.equalsIgnoreCase("UP"));
72               }
73               else
74               {
75                 return Mono.just(false);
76               }
77             })
78             .block());
79   }
80
81
82   @BeforeAll
83   static void setUpDocker() throws IOException, InterruptedException
84   {
85     KAFKA.start();
86     HAPROXY.start();
87
88     Container.ExecResult result;
89     result = KAFKA.execInContainer(
90         "kafka-topics",
91         "--bootstrap-server",
92         "kafka:9999",
93         "--create",
94         "--topic",
95         "info_channel",
96         "--partitions",
97         "3");
98     log.info(
99         "EXIT-CODE={}, STDOUT={}, STDERR={}",
100         result.getExitCode(),
101         result.getStdout(),
102         result.getStdout());
103     result = KAFKA.execInContainer(
104         "kafka-topics",
105         "--bootstrap-server",
106         "kafka:9999",
107         "--create",
108         "--topic",
109         "data_channel",
110         "--partitions",
111         "10");
112     log.info(
113         "EXIT-CODE={}, STDOUT={}, STDERR={}",
114         result.getExitCode(),
115         result.getStdout(),
116         result.getStdout());
117
118     BACKEND_1.start();
119     // BACKEND_2.start();
120     // BACKEND_3.start();
121   }
122
123   static Network NETWORK = Network.newNetwork();
124
125   static KafkaContainer KAFKA =
126       new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
127           .withNetwork(NETWORK)
128           .withNetworkAliases("kafka")
129           .withListener(() -> "kafka:9999")
130           .withKraft()
131           .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
132           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
133
134   static GenericContainer BACKEND_1 =
135       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
136           .withImagePullPolicy(NEVER_PULL)
137           .withNetwork(NETWORK)
138           .withNetworkAliases("backend-1")
139           .withCommand(
140               "--chat.backend.instance-id=backend-1",
141               "--chat.backend.services=kafka",
142               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
143               "--chat.backend.kafka.instance-uri=http://backend-1:8080",
144               "--chat.backend.kafka.num-partitions=10",
145               "--chat.backend.kafka.client-id-prefix=B1",
146               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
147               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
148           )
149           .withExposedPorts(8080)
150           .dependsOn(KAFKA)
151           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
152           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
153
154   static GenericContainer BACKEND_2 =
155       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
156           .withImagePullPolicy(NEVER_PULL)
157           .withNetwork(NETWORK)
158           .withNetworkAliases("backend-2")
159           .withCommand(
160               "--chat.backend.instance-id=backend-2",
161               "--chat.backend.services=kafka",
162               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
163               "--chat.backend.kafka.instance-uri=http://backend-2:8080",
164               "--chat.backend.kafka.num-partitions=10",
165               "--chat.backend.kafka.client-id-prefix=B2",
166               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
167               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
168           )
169           .withExposedPorts(8080)
170           .dependsOn(KAFKA)
171           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
172           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
173
174
175   static GenericContainer BACKEND_3 =
176       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
177           .withImagePullPolicy(NEVER_PULL)
178           .withNetwork(NETWORK)
179           .withNetworkAliases("backend-3")
180           .withCommand(
181               "--chat.backend.instance-id=backend-3",
182               "--chat.backend.services=kafka",
183               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
184               "--chat.backend.kafka.instance-uri=http://backend-3:8080",
185               "--chat.backend.kafka.num-partitions=10",
186               "--chat.backend.kafka.client-id-prefix=B3",
187               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
188               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
189           )
190           .withExposedPorts(8080)
191           .dependsOn(KAFKA)
192           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
193           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
194
195   static GenericContainer HAPROXY =
196       new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
197           .withNetwork(NETWORK)
198           .withNetworkAliases("haproxy")
199           .withClasspathResourceMapping(
200               "haproxy.cfg",
201               "/usr/local/etc/haproxy/haproxy.cfg",
202               BindMode.READ_ONLY)
203           .withClasspathResourceMapping(
204               "sharding.map",
205               "/usr/local/etc/haproxy/sharding.map",
206               BindMode.READ_WRITE)
207           .withExposedPorts(8400, 8401, 8404)
208           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
209 }