import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Testcontainers;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
@Slf4j
public abstract class AbstractHandoverIT
{
+ static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
+
+
private final AbstractContainerTemplates containerTemplates;
void test() throws InterruptedException
{
ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
- User user = new User("nerd");
- IntStream
- .rangeClosed(1,100)
- .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
- .map(result -> result
- .map(MessageTo::toString)
- .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
- .block())
- .forEach(result -> log.info("{}", result));
receiveMessages(chatRoom)
.take(100)
.block();
}
- final Network network = Network.newNetwork();
-
- GenericContainer haproxy, backend1, backend2, backend3;
-
Mono<ChatRoomInfoTo> createChatRoom(String name)
{
return webClient
});
}
+ Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
+ {
+ return webClient
+ .get()
+ .uri(
+ "/{chatRoomId}/listen",
+ chatRoom.getId())
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(SSE_TYPE);
+ }
+
WebClient webClient;
{
containerTemplates.setUp();
-
- Integer port = haproxy.getMappedPort(8400);
+ Integer port = containerTemplates.haproxy.getMappedPort(8400);
webClient = WebClient.create("http://localhost:" + port);
}
}
@Slf4j
public class TestClient
{
- static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
-
-
- Mono<ChatRoomInfoTo> createChatRoom(String name)
- {
- return webClient
- .post()
- .uri("/create")
- .contentType(MediaType.TEXT_PLAIN)
- .bodyValue(name)
- .accept(MediaType.APPLICATION_JSON)
- .exchangeToMono(response ->
- {
- if (response.statusCode().equals(HttpStatus.OK))
- {
- return response.bodyToMono(ChatRoomInfoTo.class);
- }
- else
- {
- return response.createError();
- }
- });
- }
-
Mono<MessageTo> sendMessage(
ChatRoomInfoTo chatRoom,
User user,
});
}
- Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
- {
- return webClient
- .get()
- .uri(
- "/{chatRoomId}/listen",
- chatRoom.getId())
- .accept(MediaType.TEXT_EVENT_STREAM)
- .retrieve()
- .bodyToFlux(SSE_TYPE);
- }
-
private final WebClient webClient;