package de.juplo.kafka.chat.backend;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.stream.IntStream;
@Testcontainers
void test() throws InterruptedException
{
ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
+ TestClient testClient = new TestClient(
+ containers.haproxy.getMappedPort(8400),
+ "nerd");
+ IntStream
+ .rangeClosed(1,100)
+ .mapToObj(i ->testClient.sendMessage(chatRoom, user, "Message #" + i))
+ .map(result -> result
+ .map(MessageTo::toString)
+ .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
+ .block())
+ .forEach(result -> log.info("{}", result));
receiveMessages(chatRoom)
.take(100)
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
{
Mono<MessageTo> sendMessage(
ChatRoomInfoTo chatRoom,
- User user,
String message)
{
return webClient
private final WebClient webClient;
+ private final User user;
- TestClient(Integer port)
+ TestClient(Integer port, String username)
{
webClient = WebClient.create("http://localhost:" + port);
+ user = new User(username);
}
}