TMP:test -- `ChatRoomDataTest`
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / AbstractHandoverITContainers.java
1 package de.juplo.kafka.chat.backend;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.awaitility.Awaitility;
5 import org.springframework.http.HttpStatus;
6 import org.springframework.web.reactive.function.client.WebClient;
7 import org.testcontainers.containers.BindMode;
8 import org.testcontainers.containers.GenericContainer;
9 import org.testcontainers.containers.Network;
10 import org.testcontainers.containers.output.Slf4jLogConsumer;
11 import org.testcontainers.containers.wait.strategy.Wait;
12 import org.testcontainers.images.ImagePullPolicy;
13 import org.testcontainers.utility.DockerImageName;
14 import reactor.core.publisher.Mono;
15
16 import java.time.Duration;
17 import java.util.Arrays;
18
19
20 @Slf4j
21 public abstract class AbstractHandoverITContainers
22 {
23   static final ImagePullPolicy NEVER_PULL = imageName -> false;
24
25
26   final Network network = Network.newNetwork();
27   final GenericContainer haproxy, backend1, backend2, backend3;
28
29
30   AbstractHandoverITContainers()
31   {
32     haproxy = createHaproxyContainer();
33     backend1 = createBackendContainer("1");
34     backend2 = createBackendContainer("2");
35     backend3 = createBackendContainer("3");
36   }
37
38
39   void setUpExtra() throws Exception
40   {
41     log.info("This setup does not need any extra containers");
42   }
43
44   void setUp() throws Exception
45   {
46     setUpExtra();
47     haproxy.start();
48   }
49
50   void startBackend(
51       GenericContainer backend,
52       TestWriter[] testWriters)
53   {
54     backend.start();
55
56     int[] numSentMessages = Arrays
57         .stream(testWriters)
58         .mapToInt(testWriter -> testWriter.getNumSentMessages())
59         .toArray();
60
61     Awaitility
62         .await()
63         .atMost(Duration.ofSeconds(30))
64         .until(() -> WebClient
65             .create("http://localhost:" + backend.getMappedPort(8080))
66             .get()
67             .uri("/actuator/health")
68             .exchangeToMono(response ->
69             {
70               if (response.statusCode().equals(HttpStatus.OK))
71               {
72                 return response
73                     .bodyToMono(StatusTo.class)
74                     .map(StatusTo::getStatus)
75                     .map(status -> status.equalsIgnoreCase("UP"));
76               }
77               else
78               {
79                 return Mono.just(false);
80               }
81             })
82             .block());
83
84     haproxy
85         .getDockerClient()
86         .killContainerCmd(haproxy.getContainerId())
87         .withSignal("HUP")
88         .exec();
89
90     Awaitility
91         .await()
92         .atMost(Duration.ofSeconds(30))
93         .until(() -> WebClient
94             .create("http://localhost:" + haproxy.getMappedPort(8400))
95             .get()
96             .uri("/actuator/health")
97             .exchangeToMono(response ->
98             {
99               if (response.statusCode().equals(HttpStatus.OK))
100               {
101                 return response
102                     .bodyToMono(StatusTo.class)
103                     .map(StatusTo::getStatus)
104                     .map(status -> status.equalsIgnoreCase("UP"));
105               }
106               else
107               {
108                 return Mono.just(false);
109               }
110             })
111             .block());
112
113     Awaitility
114         .await()
115         .atMost(Duration.ofSeconds(30))
116         .until(() ->
117         {
118           for (int i = 0; i < testWriters.length; i++)
119           {
120             TestWriter testWriter = testWriters[i];
121             int sentTotal = testWriter.getNumSentMessages();
122             if (sentTotal == numSentMessages[i])
123             {
124               log.info(
125                   "No progress for {}: sent-before={}, sent-total={}",
126                   testWriter,
127                   numSentMessages[i],
128                   sentTotal);
129               return false;
130             }
131           }
132
133           return true;
134         });
135   }
136
137   abstract String[] getBackendCommand();
138
139   final GenericContainer createHaproxyContainer()
140   {
141     return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
142         .withNetwork(network)
143         .withNetworkAliases("haproxy")
144         .withClasspathResourceMapping(
145             "haproxy.cfg",
146             "/usr/local/etc/haproxy/haproxy.cfg",
147             BindMode.READ_ONLY)
148         .withClasspathResourceMapping(
149             "sharding.map",
150             "/usr/local/etc/haproxy/sharding.map",
151             BindMode.READ_WRITE)
152         .withExposedPorts(8400, 8401, 8404)
153         .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
154   }
155
156   final GenericContainer createBackendContainer(String id)
157   {
158     return new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
159       .withImagePullPolicy(NEVER_PULL)
160       .withNetwork(network)
161       .withNetworkAliases("backend-ID".replaceAll("ID", id))
162       .withCommand(Arrays.stream(getBackendCommand())
163           .map(commandPart -> commandPart.replaceAll("ID", id))
164           .toArray(size -> new String[size]))
165       .withExposedPorts(8080)
166       .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
167       .withLogConsumer(new Slf4jLogConsumer(
168           log,
169           true
170           )
171           .withPrefix("BACKEND-ID".replaceAll("ID", id)));
172   }
173 }