test: HandoverIT-POC - Messages are written to 23 chat-rooms instead of 1
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / AbstractHandoverIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import lombok.extern.slf4j.Slf4j;
5 import org.junit.jupiter.api.BeforeEach;
6 import org.junit.jupiter.api.Test;
7 import org.springframework.core.ParameterizedTypeReference;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.http.MediaType;
10 import org.springframework.http.codec.ServerSentEvent;
11 import org.springframework.web.reactive.function.client.WebClient;
12 import org.testcontainers.junit.jupiter.Testcontainers;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15
16
17 @Testcontainers
18 @Slf4j
19 public abstract class AbstractHandoverIT
20 {
21   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
22   static final int NUM_CHATROOMS = 23;
23
24
25   private final AbstractHandoverITContainers containers;
26
27
28   AbstractHandoverIT(AbstractHandoverITContainers containers)
29   {
30     this.containers = containers;
31   }
32
33
34   @Test
35   void test() throws InterruptedException
36   {
37     ChatRoomInfoTo[] chatRooms = Flux
38         .range(0, NUM_CHATROOMS)
39         .flatMap(i -> createChatRoom("#" + i))
40         .toStream()
41         .toArray(size -> new ChatRoomInfoTo[size]);
42
43     TestClient testClient = new TestClient(
44         containers.haproxy.getMappedPort(8400),
45         chatRooms,
46         "nerd");
47     testClient.run();
48
49     Flux
50         .fromArray(chatRooms)
51         .flatMap(chatRoom ->receiveMessages(chatRoom).take(100))
52         .doOnNext(message -> log.info("message: {}", message))
53         .then()
54         .block();
55   }
56
57   Mono<ChatRoomInfoTo> createChatRoom(String name)
58   {
59     return webClient
60         .post()
61         .uri("/create")
62         .contentType(MediaType.TEXT_PLAIN)
63         .bodyValue(name)
64         .accept(MediaType.APPLICATION_JSON)
65         .exchangeToMono(response ->
66         {
67           if (response.statusCode().equals(HttpStatus.OK))
68           {
69             return response.bodyToMono(ChatRoomInfoTo.class);
70           }
71           else
72           {
73             return response.createError();
74           }
75         });
76   }
77
78   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
79   {
80     return webClient
81         .get()
82         .uri(
83             "/{chatRoomId}/listen",
84             chatRoom.getId())
85         .accept(MediaType.TEXT_EVENT_STREAM)
86         .retrieve()
87         .bodyToFlux(SSE_TYPE);
88   }
89
90
91   WebClient webClient;
92
93   @BeforeEach
94   void setUp() throws Exception
95   {
96     containers.setUp();
97
98     Integer port = containers.haproxy.getMappedPort(8400);
99     webClient = WebClient.create("http://localhost:" + port);
100   }
101 }