1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
4 import de.juplo.kafka.chat.backend.api.MessageTo;
5 import lombok.EqualsAndHashCode;
8 import lombok.ToString;
9 import lombok.extern.slf4j.Slf4j;
10 import org.junit.jupiter.api.Test;
11 import org.springframework.core.ParameterizedTypeReference;
12 import org.springframework.http.HttpStatus;
13 import org.springframework.http.MediaType;
14 import org.springframework.http.codec.ServerSentEvent;
15 import org.springframework.web.reactive.function.client.WebClient;
16 import org.testcontainers.images.ImagePullPolicy;
17 import org.testcontainers.junit.jupiter.Testcontainers;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
20 import reactor.util.retry.Retry;
22 import java.time.Duration;
23 import java.util.stream.IntStream;
28 public abstract class AbstractHandoverIT
30 static final ImagePullPolicy NEVER_PULL = imageName -> false;
31 static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
35 void test() throws InterruptedException
37 ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
38 User user = new User("nerd");
41 .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
43 .map(MessageTo::toString)
44 .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
46 .forEach(result -> log.info("{}", result));
48 receiveMessages(chatRoom)
50 .doOnNext(message -> log.info("message: {}", message))
55 Mono<ChatRoomInfoTo> createChatRoom(String name)
60 .contentType(MediaType.TEXT_PLAIN)
62 .accept(MediaType.APPLICATION_JSON)
63 .exchangeToMono(response ->
65 if (response.statusCode().equals(HttpStatus.OK))
67 return response.bodyToMono(ChatRoomInfoTo.class);
71 return response.createError();
76 Mono<MessageTo> sendMessage(
77 ChatRoomInfoTo chatRoom,
84 "/{chatRoomId}/{username}/{serial}",
88 .contentType(MediaType.TEXT_PLAIN)
89 .accept(MediaType.APPLICATION_JSON)
91 .exchangeToMono(response ->
93 if (response.statusCode().equals(HttpStatus.OK))
95 return response.bodyToMono(MessageTo.class);
99 return response.createError();
104 Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
109 "/{chatRoomId}/listen",
111 .accept(MediaType.TEXT_EVENT_STREAM)
113 .bodyToFlux(SSE_TYPE);
125 private final String name;
126 private int serial = 0;
143 static class StatusTo