package de.juplo.kafka.chat.backend;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Testcontainers;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.stream.IntStream;
@Testcontainers
}
+ @Test
+ void test() throws InterruptedException
+ {
+ ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
+ User user = new User("nerd");
+ IntStream
+ .rangeClosed(1,100)
+ .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
+ .map(result -> result
+ .map(MessageTo::toString)
+ .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
+ .block())
+ .forEach(result -> log.info("{}", result));
+
+ receiveMessages(chatRoom)
+ .take(100)
+ .doOnNext(message -> log.info("message: {}", message))
+ .then()
+ .block();
+ }
+
+ final Network network = Network.newNetwork();
+
+ GenericContainer haproxy, backend1, backend2, backend3;
+
+ Mono<ChatRoomInfoTo> createChatRoom(String name)
+ {
+ return webClient
+ .post()
+ .uri("/create")
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(name)
+ .accept(MediaType.APPLICATION_JSON)
+ .exchangeToMono(response ->
+ {
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response.bodyToMono(ChatRoomInfoTo.class);
+ }
+ else
+ {
+ return response.createError();
+ }
+ });
+ }
+
+
+ WebClient webClient;
+
@BeforeEach
void setUp() throws Exception
{
containerTemplates.setUp();
+
+
+ Integer port = haproxy.getMappedPort(8400);
+ webClient = WebClient.create("http://localhost:" + port);
}
}