test: HandoverIT-POC - Splitted up code into smaller classes -- ALIGN
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / AbstractHandoverIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import lombok.extern.slf4j.Slf4j;
5 import org.junit.jupiter.api.BeforeEach;
6 import org.junit.jupiter.api.Test;
7 import org.springframework.core.ParameterizedTypeReference;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.http.MediaType;
10 import org.springframework.http.codec.ServerSentEvent;
11 import org.springframework.web.reactive.function.client.WebClient;
12 import org.testcontainers.junit.jupiter.Testcontainers;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15
16
17 @Testcontainers
18 @Slf4j
19 public abstract class AbstractHandoverIT
20 {
21   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
22
23
24   private final AbstractHandoverITContainers containers;
25
26
27   AbstractHandoverIT(AbstractHandoverITContainers containers)
28   {
29     this.containers = containers;
30   }
31
32
33   @Test
34   void test() throws InterruptedException
35   {
36     ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
37     TestClient testClient = new TestClient(
38         containers.haproxy.getMappedPort(8400),
39         chatRoom,
40         "nerd");
41     testClient.run();
42
43     receiveMessages(chatRoom)
44         .take(100)
45         .doOnNext(message -> log.info("message: {}", message))
46         .then()
47         .block();
48   }
49
50   Mono<ChatRoomInfoTo> createChatRoom(String name)
51   {
52     return webClient
53         .post()
54         .uri("/create")
55         .contentType(MediaType.TEXT_PLAIN)
56         .bodyValue(name)
57         .accept(MediaType.APPLICATION_JSON)
58         .exchangeToMono(response ->
59         {
60           if (response.statusCode().equals(HttpStatus.OK))
61           {
62             return response.bodyToMono(ChatRoomInfoTo.class);
63           }
64           else
65           {
66             return response.createError();
67           }
68         });
69   }
70
71   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
72   {
73     return webClient
74         .get()
75         .uri(
76             "/{chatRoomId}/listen",
77             chatRoom.getId())
78         .accept(MediaType.TEXT_EVENT_STREAM)
79         .retrieve()
80         .bodyToFlux(SSE_TYPE);
81   }
82
83
84   WebClient webClient;
85
86   @BeforeEach
87   void setUp() throws Exception
88   {
89     containers.setUp();
90
91     Integer port = containers.haproxy.getMappedPort(8400);
92     webClient = WebClient.create("http://localhost:" + port);
93   }
94 }