test: HandoverIT-POC - setup without static containers
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaHandoverIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.awaitility.Awaitility;
5 import org.junit.jupiter.api.BeforeEach;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.web.reactive.function.client.WebClient;
8 import org.testcontainers.containers.*;
9 import org.testcontainers.containers.output.Slf4jLogConsumer;
10 import org.testcontainers.containers.wait.strategy.Wait;
11 import org.testcontainers.utility.DockerImageName;
12 import reactor.core.publisher.Mono;
13
14 import java.io.IOException;
15 import java.time.Duration;
16
17
18 @Slf4j
19 class KafkaHandoverIT extends AbstractHandoverIT
20 {
21   @BeforeEach
22   void setUpWebClient() throws IOException, InterruptedException
23   {
24     kafka.start();
25     haproxy.start();
26
27     Container.ExecResult result;
28     result = kafka.execInContainer(
29         "kafka-topics",
30         "--bootstrap-server",
31         "kafka:9999",
32         "--create",
33         "--topic",
34         "info_channel",
35         "--partitions",
36         "3");
37     log.info(
38         "EXIT-CODE={}, STDOUT={}, STDERR={}",
39         result.getExitCode(),
40         result.getStdout(),
41         result.getStdout());
42     result = kafka.execInContainer(
43         "kafka-topics",
44         "--bootstrap-server",
45         "kafka:9999",
46         "--create",
47         "--topic",
48         "data_channel",
49         "--partitions",
50         "10");
51     log.info(
52         "EXIT-CODE={}, STDOUT={}, STDERR={}",
53         result.getExitCode(),
54         result.getStdout(),
55         result.getStdout());
56
57     backend1.start();
58     // backend2.start();
59     // backend3.start();
60
61     Integer port = haproxy.getMappedPort(8400);
62     webClient = WebClient.create("http://localhost:" + port);
63
64     Awaitility
65         .await()
66         .atMost(Duration.ofMinutes(10))
67         .until(() -> WebClient
68             .create("http://localhost:" + backend1.getMappedPort(8080))
69             .get()
70             .uri("/actuator/health")
71             .exchangeToMono(response ->
72             {
73               if (response.statusCode().equals(HttpStatus.OK))
74               {
75                 return response
76                     .bodyToMono(StatusTo.class)
77                     .map(StatusTo::getStatus)
78                     .map(status -> status.equalsIgnoreCase("UP"));
79               }
80               else
81               {
82                 return Mono.just(false);
83               }
84             })
85             .block());
86
87     haproxy
88         .getDockerClient()
89         .killContainerCmd(haproxy.getContainerId())
90         .withSignal("HUP")
91         .exec();
92
93
94     Awaitility
95         .await()
96         .atMost(Duration.ofMinutes(10))
97         .until(() -> webClient
98             .get()
99             .uri("/actuator/health")
100             .exchangeToMono(response ->
101             {
102               if (response.statusCode().equals(HttpStatus.OK))
103               {
104                 return response
105                     .bodyToMono(StatusTo.class)
106                     .map(StatusTo::getStatus)
107                     .map(status -> status.equalsIgnoreCase("UP"));
108               }
109               else
110               {
111                 return Mono.just(false);
112               }
113             })
114             .block());
115   }
116
117
118   Network network = Network.newNetwork();
119
120   KafkaContainer kafka =
121       new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
122           .withNetwork(network)
123           .withNetworkAliases("kafka")
124           .withListener(() -> "kafka:9999")
125           .withKraft()
126           .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
127           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
128
129   GenericContainer backend1 =
130       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
131           .withImagePullPolicy(NEVER_PULL)
132           .withNetwork(network)
133           .withNetworkAliases("backend-1")
134           .withCommand(
135               "--chat.backend.instance-id=backend-1",
136               "--chat.backend.services=kafka",
137               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
138               "--chat.backend.kafka.instance-uri=http://backend-1:8080",
139               "--chat.backend.kafka.num-partitions=10",
140               "--chat.backend.kafka.client-id-prefix=B1",
141               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
142               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
143           )
144           .withExposedPorts(8080)
145           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
146           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
147
148   GenericContainer backend2 =
149       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
150           .withImagePullPolicy(NEVER_PULL)
151           .withNetwork(network)
152           .withNetworkAliases("backend-2")
153           .withCommand(
154               "--chat.backend.instance-id=backend-2",
155               "--chat.backend.services=kafka",
156               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
157               "--chat.backend.kafka.instance-uri=http://backend-2:8080",
158               "--chat.backend.kafka.num-partitions=10",
159               "--chat.backend.kafka.client-id-prefix=B2",
160               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
161               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
162           )
163           .withExposedPorts(8080)
164           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
165           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
166
167   GenericContainer backend3 =
168       new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
169           .withImagePullPolicy(NEVER_PULL)
170           .withNetwork(network)
171           .withNetworkAliases("backend-3")
172           .withCommand(
173               "--chat.backend.instance-id=backend-3",
174               "--chat.backend.services=kafka",
175               "--chat.backend.kafka.bootstrap-servers=kafka:9999",
176               "--chat.backend.kafka.instance-uri=http://backend-3:8080",
177               "--chat.backend.kafka.num-partitions=10",
178               "--chat.backend.kafka.client-id-prefix=B3",
179               "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
180               "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
181           )
182           .withExposedPorts(8080)
183           .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
184           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
185
186   GenericContainer haproxy =
187       new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
188           .withNetwork(network)
189           .withNetworkAliases("haproxy")
190           .withClasspathResourceMapping(
191               "haproxy.cfg",
192               "/usr/local/etc/haproxy/haproxy.cfg",
193               BindMode.READ_ONLY)
194           .withClasspathResourceMapping(
195               "sharding.map",
196               "/usr/local/etc/haproxy/sharding.map",
197               BindMode.READ_WRITE)
198           .withExposedPorts(8400, 8401, 8404)
199           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
200 }