WIP:test: HandoverIT-POC - splitted up code into smaller classes -- ALIGN
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / HandoverIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.extern.slf4j.Slf4j;
6 import org.junit.jupiter.api.BeforeEach;
7 import org.junit.jupiter.api.Test;
8 import org.springframework.core.ParameterizedTypeReference;
9 import org.springframework.http.HttpStatus;
10 import org.springframework.http.MediaType;
11 import org.springframework.http.codec.ServerSentEvent;
12 import org.springframework.web.reactive.function.client.WebClient;
13 import org.testcontainers.junit.jupiter.Testcontainers;
14 import reactor.core.publisher.Flux;
15 import reactor.core.publisher.Mono;
16 import reactor.util.retry.Retry;
17
18 import java.time.Duration;
19 import java.util.stream.IntStream;
20
21
22 @Testcontainers
23 @Slf4j
24 public abstract class HandoverIT
25 {
26   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
27
28
29   private final HandoverITContainers containers;
30
31
32   HandoverIT(HandoverITContainers containers)
33   {
34     this.containers = containers;
35   }
36
37
38   @Test
39   void test() throws InterruptedException
40   {
41     ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
42     TestClient testClient = new TestClient(
43         containers.haproxy.getMappedPort(8400),
44         "nerd");
45     IntStream
46         .rangeClosed(1,100)
47         .mapToObj(i ->testClient.sendMessage(chatRoom, "Message #" + i))
48         .map(result -> result
49             .map(MessageTo::toString)
50             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
51             .block())
52         .forEach(result -> log.info("{}", result));
53
54     receiveMessages(chatRoom)
55         .take(100)
56         .doOnNext(message -> log.info("message: {}", message))
57         .then()
58         .block();
59   }
60
61   Mono<ChatRoomInfoTo> createChatRoom(String name)
62   {
63     return webClient
64         .post()
65         .uri("/create")
66         .contentType(MediaType.TEXT_PLAIN)
67         .bodyValue(name)
68         .accept(MediaType.APPLICATION_JSON)
69         .exchangeToMono(response ->
70         {
71           if (response.statusCode().equals(HttpStatus.OK))
72           {
73             return response.bodyToMono(ChatRoomInfoTo.class);
74           }
75           else
76           {
77             return response.createError();
78           }
79         });
80   }
81
82   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
83   {
84     return webClient
85         .get()
86         .uri(
87             "/{chatRoomId}/listen",
88             chatRoom.getId())
89         .accept(MediaType.TEXT_EVENT_STREAM)
90         .retrieve()
91         .bodyToFlux(SSE_TYPE);
92   }
93
94
95   WebClient webClient;
96
97   @BeforeEach
98   void setUp() throws Exception
99   {
100     containers.setUp();
101
102     Integer port = containers.haproxy.getMappedPort(8400);
103     webClient = WebClient.create("http://localhost:" + port);
104   }
105 }