test: HandoverIT-POC - First working setup for the planned test
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / AbstractHandoverIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
6 import lombok.Getter;
7 import lombok.Setter;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.junit.jupiter.api.Test;
11 import org.springframework.core.ParameterizedTypeReference;
12 import org.springframework.http.HttpStatus;
13 import org.springframework.http.MediaType;
14 import org.springframework.http.codec.ServerSentEvent;
15 import org.springframework.web.reactive.function.client.WebClient;
16 import org.testcontainers.images.ImagePullPolicy;
17 import org.testcontainers.junit.jupiter.Testcontainers;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
20 import reactor.util.retry.Retry;
21
22 import java.time.Duration;
23 import java.util.stream.IntStream;
24
25
26 @Testcontainers
27 @Slf4j
28 public abstract class AbstractHandoverIT
29 {
30   static final ImagePullPolicy NEVER_PULL = imageName -> false;
31   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
32
33
34   @Test
35   void test() throws InterruptedException
36   {
37     ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
38     User user = new User("nerd");
39     IntStream
40         .rangeClosed(1,100)
41         .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
42         .map(result -> result
43             .map(MessageTo::toString)
44             .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
45             .block())
46         .forEach(result -> log.info("{}", result));
47
48     receiveMessages(chatRoom)
49         .take(100)
50         .doOnNext(message -> log.info("message: {}", message))
51         .then()
52         .block();
53   }
54
55   Mono<ChatRoomInfoTo> createChatRoom(String name)
56   {
57     return webClient
58         .post()
59         .uri("/create")
60         .contentType(MediaType.TEXT_PLAIN)
61         .bodyValue(name)
62         .accept(MediaType.APPLICATION_JSON)
63         .exchangeToMono(response ->
64         {
65           if (response.statusCode().equals(HttpStatus.OK))
66           {
67             return response.bodyToMono(ChatRoomInfoTo.class);
68           }
69           else
70           {
71             return response.createError();
72           }
73         });
74   }
75
76   Mono<MessageTo> sendMessage(
77       ChatRoomInfoTo chatRoom,
78       User user,
79       String message)
80   {
81     return webClient
82         .put()
83         .uri(
84             "/{chatRoomId}/{username}/{serial}",
85             chatRoom.getId(),
86             user.getName(),
87             user.nextSerial())
88         .contentType(MediaType.TEXT_PLAIN)
89         .accept(MediaType.APPLICATION_JSON)
90         .bodyValue(message)
91         .exchangeToMono(response ->
92         {
93           if (response.statusCode().equals(HttpStatus.OK))
94           {
95             return response.bodyToMono(MessageTo.class);
96           }
97           else
98           {
99             return response.createError();
100           }
101         });
102   }
103
104   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
105   {
106     return webClient
107         .get()
108         .uri(
109             "/{chatRoomId}/listen",
110             chatRoom.getId())
111         .accept(MediaType.TEXT_EVENT_STREAM)
112         .retrieve()
113         .bodyToFlux(SSE_TYPE);
114   }
115
116
117   WebClient webClient;
118
119
120   @EqualsAndHashCode
121   @ToString
122   class User
123   {
124     @Getter
125     private final String name;
126     private int serial = 0;
127
128
129     User (String name)
130     {
131       this.name = name;
132     }
133
134
135     int nextSerial()
136     {
137       return ++serial;
138     }
139   }
140
141   @Getter
142   @Setter
143   static class StatusTo
144   {
145     String status;
146   }
147 }