package de.juplo.kafka.chat.backend;
-import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
-import org.springframework.test.web.reactive.server.WebTestClient;
+import org.springframework.web.reactive.function.client.WebClient;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
-import java.time.Duration;
-import java.util.UUID;
+import java.util.stream.IntStream;
@Slf4j
@Test
void test() throws InterruptedException
{
- ObjectMapper objectMapper = new ObjectMapper();
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
+ ChatRoomInfoTo chatRoom = createChatRoom("bar").block();
+ User user = new User("nerd");
+ IntStream
+ .rangeClosed(1,100)
+ .forEach(i ->sendMessage(chatRoom, user, "Message #" + i));
+
+ Thread.sleep(10000);
+ receiveMessage(chatRoom).subscribe(message -> log.info("message: {}", message));
+ }
+
+ Mono<ChatRoomInfoTo> createChatRoom(String name)
+ {
+ return webClient
+ .post()
+ .uri("/create")
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(name)
+ .accept(MediaType.APPLICATION_JSON)
+ .exchangeToMono(response ->
+ {
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response.bodyToMono(ChatRoomInfoTo.class);
+ }
+ else
+ {
+ return response.createError();
+ }
+ });
+ }
+
+ Mono<MessageTo> sendMessage(
+ ChatRoomInfoTo chatRoom,
+ User user,
+ String message)
+ {
+ return webClient
+ .put()
+ .uri(
+ "/{chatRoomId}/{username}/{serial}",
+ chatRoom.getId(),
+ user.getName(),
+ user.nextSerial())
+ .contentType(MediaType.TEXT_PLAIN)
+ .accept(MediaType.APPLICATION_JSON)
+ .bodyValue(message)
+ .exchangeToMono(response ->
{
- byte[] result = webTestClient
- .post()
- .uri("/create")
- .contentType(MediaType.TEXT_PLAIN)
- .bodyValue("bar")
- .accept(MediaType.APPLICATION_JSON)
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath("$.id").exists()
- .jsonPath("$.name").isEqualTo("bar")
- // The hard must not be asserted, because not all implementations ar aware of it
- // .jsonPath("$.shard").isEqualTo(Integer.valueOf(2))
- .returnResult()
- .getResponseBody();
- ChatRoomInfoTo chatRoomInfo = objectMapper.readValue(result, ChatRoomInfoTo.class);
- UUID chatRoomId = chatRoomInfo.getId();
- webTestClient
- .put()
- .uri(
- "/{chatRoomId}/nerd/7",
- chatRoomId)
- .contentType(MediaType.TEXT_PLAIN)
- .accept(MediaType.APPLICATION_JSON)
- .bodyValue("Hello world!")
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath("$.id").isEqualTo(Integer.valueOf(7))
- .jsonPath("$.user").isEqualTo("nerd")
- .jsonPath("$.text").isEqualTo("Hello world!");
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response.bodyToMono(MessageTo.class);
+ }
+ else
+ {
+ return response.createError();
+ }
});
+ }
- Thread.sleep(30000);
+ Flux<byte[]> receiveMessage(ChatRoomInfoTo chatRoom)
+ {
+ return webClient
+ .get()
+ .uri(
+ "/{chatRoomId}",
+ chatRoom.getId())
+ .accept(MediaType.APPLICATION_OCTET_STREAM)
+ .retrieve().bodyToFlux(byte[].class);
}
@BeforeEach
void setUp()
{
Integer port = haproxy.getMappedPort(8400);
- webTestClient = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build();
+ webClient = WebClient.create("http://localhost:" + port);
}
- WebTestClient webTestClient;
+ WebClient webClient;
Network network = Network.newNetwork();
.withNetwork(network)
.withNetworkAliases("kafka")
.withListener(() -> "kafka:9999")
- .withEnv("KAFKA_NUM_PARTITIONS", "10")
.withKraft()
.waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
+ @Container
+ GenericContainer<?> setup =
+ new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
+ .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("sh", "-c"))
+ .withNetwork(network)
+ .withCommand(
+ "kafka-topics --bootstrap-server kafka:9999 --create --topic info_channel --partitions 3",
+ "kafka-topics --bootstrap-server kafka:9999 --create --topic data_channel --partitions 10")
+ .dependsOn(kafka)
+ .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("SETUP"));
+
@Container
GenericContainer backend_1 =
new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
"--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
"--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
)
- .dependsOn(kafka)
+ .dependsOn(setup)
.waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
"--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
"--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
)
- .dependsOn(kafka)
+ .dependsOn(setup)
.waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
"--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
"--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
)
- .dependsOn(kafka)
+ .dependsOn(setup)
.waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
.withExposedPorts(8400) // , 8401, 8404, 5555)
.dependsOn(backend_1, backend_2, backend_3)
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
+
+ @EqualsAndHashCode
+ @ToString
+ class User
+ {
+ @Getter
+ private final String name;
+ private int serial = 0;
+
+
+ User (String name)
+ {
+ this.name = name;
+ }
+
+
+ int nextSerial()
+ {
+ return ++serial;
+ }
+ }
}