import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
-import org.testcontainers.images.ImagePullPolicy;
import org.testcontainers.junit.jupiter.Testcontainers;
+import pl.rzrz.assertj.reactor.Assertions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-import java.time.Duration;
-import java.util.stream.IntStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
@Testcontainers
@Slf4j
public abstract class AbstractHandoverIT
{
- static final ImagePullPolicy NEVER_PULL = imageName -> false;
- static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
+ static final int NUM_CHATROOMS = 23;
+ static final int NUM_CLIENTS = 17;
+ private final AbstractHandoverITContainers containers;
+
+
+ AbstractHandoverIT(AbstractHandoverITContainers containers)
+ {
+ this.containers = containers;
+ }
+
+
+ @Disabled
@Test
void test() throws InterruptedException
{
- ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
- User user = new User("nerd");
- IntStream
- .rangeClosed(1,100)
- .mapToObj(i ->sendMessage(chatRoom, user, "Message #" + i))
- .map(result -> result
- .map(MessageTo::toString)
- .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
- .block())
- .forEach(result -> log.info("{}", result));
-
- receiveMessages(chatRoom)
- .take(100)
- .doOnNext(message -> log.info("message: {}", message))
- .then()
- .block();
+ ChatRoomInfoTo[] chatRooms = Flux
+ .range(0, NUM_CHATROOMS)
+ .flatMap(i -> createChatRoom("room-" + i))
+ .toStream()
+ .toArray(size -> new ChatRoomInfoTo[size]);
+
+ int port = containers.haproxy.getMappedPort(8400);
+
+ CompletableFuture<Void>[] testWriterFutures = new CompletableFuture[NUM_CLIENTS];
+ TestWriter[] testWriters = new TestWriter[NUM_CLIENTS];
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ TestWriter testWriter = new TestWriter(
+ port,
+ chatRooms[i % NUM_CHATROOMS],
+ "user-" + i);
+ testWriters[i] = testWriter;
+ testWriterFutures[i] = testWriter
+ .run()
+ .toFuture();
+ }
+
+ TestListener testListener = new TestListener(port, chatRooms);
+ testListener
+ .run()
+ .subscribe(message -> log.info(
+ "Received message: {}",
+ message));
+
+ log.info("Sleeping for 3 seconds...");
+ Thread.sleep(3000);
+
+ log.info("Starting Backend 2...");
+ containers.backend2.start();
+ log.info("Backend 2 started");
+
+ log.info("Sleeping for 3 seconds...");
+ Thread.sleep(3000);
+
+ log.info("Starting Backend 3...");
+ containers.backend3.start();
+ log.info("Backend 3 started");
+
+ log.info("Sleeping for 3 seconds...");
+ Thread.sleep(3000);
+
+ log.info("Shutting down writers...");
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ log.info("Shutting down writer {}", i);
+ testWriters[i].running = false;
+ testWriterFutures[i].join();
+ log.info("Joined TestWriter {}", testWriters[i].user);
+ }
+
+ // Yield the work, so that the last messages can be received
+ Thread.sleep(500);
+
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ TestWriter testWriter = testWriters[i];
+ ChatRoomInfoTo chatRoom = testWriter.chatRoom;
+ List<MessageTo> receivedMessages = testListener.receivedMessages.get(chatRoom.getId());
+
+ Assertions.assertThat(receivedMessages
+ .stream()
+ .filter(message -> message.getUser().equals(testWriter.user.getName()))
+ ).containsExactlyElementsOf(testWriter.sentMessages);
+ }
}
Mono<ChatRoomInfoTo> createChatRoom(String name)
});
}
- Mono<MessageTo> sendMessage(
- ChatRoomInfoTo chatRoom,
- User user,
- String message)
- {
- return webClient
- .put()
- .uri(
- "/{chatRoomId}/{username}/{serial}",
- chatRoom.getId(),
- user.getName(),
- user.nextSerial())
- .contentType(MediaType.TEXT_PLAIN)
- .accept(MediaType.APPLICATION_JSON)
- .bodyValue(message)
- .exchangeToMono(response ->
- {
- if (response.statusCode().equals(HttpStatus.OK))
- {
- return response.bodyToMono(MessageTo.class);
- }
- else
- {
- return response.createError();
- }
- });
- }
-
- Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
- {
- return webClient
- .get()
- .uri(
- "/{chatRoomId}/listen",
- chatRoom.getId())
- .accept(MediaType.TEXT_EVENT_STREAM)
- .retrieve()
- .bodyToFlux(SSE_TYPE);
- }
-
WebClient webClient;
-
- @EqualsAndHashCode
- @ToString
- class User
+ @BeforeEach
+ void setUp() throws Exception
{
- @Getter
- private final String name;
- private int serial = 0;
-
-
- User (String name)
- {
- this.name = name;
- }
+ containers.setUp();
-
- int nextSerial()
- {
- return ++serial;
- }
- }
-
- @Getter
- @Setter
- static class StatusTo
- {
- String status;
+ Integer port = containers.haproxy.getMappedPort(8400);
+ webClient = WebClient.create("http://localhost:" + port);
}
}