package de.juplo.kafka.chat.backend;
-import de.juplo.kafka.chat.backend.implementation.kafka.ConsumerTaskRunner;
-import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner;
-import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.context.annotation.Import;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.reactive.server.WebTestClient;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.images.ImagePullPolicy;
import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
+import java.time.Duration;
+import java.util.UUID;
@Slf4j
class KafkaHandoverIT extends AbstractHandoverIT
{
- Network network = Network.newNetwork();
+ @Test
+ void test() throws InterruptedException
+ {
+ ObjectMapper objectMapper = new ObjectMapper();
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ byte[] result = webTestClient
+ .post()
+ .uri("/create")
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue("bar")
+ .accept(MediaType.APPLICATION_JSON)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody()
+ .jsonPath("$.id").exists()
+ .jsonPath("$.name").isEqualTo("bar")
+ // The hard must not be asserted, because not all implementations ar aware of it
+ // .jsonPath("$.shard").isEqualTo(Integer.valueOf(2))
+ .returnResult()
+ .getResponseBody();
+ ChatRoomInfoTo chatRoomInfo = objectMapper.readValue(result, ChatRoomInfoTo.class);
+ UUID chatRoomId = chatRoomInfo.getId();
+ webTestClient
+ .put()
+ .uri(
+ "/{chatRoomId}/nerd/7",
+ chatRoomId)
+ .contentType(MediaType.TEXT_PLAIN)
+ .accept(MediaType.APPLICATION_JSON)
+ .bodyValue("Hello world!")
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody()
+ .jsonPath("$.id").isEqualTo(Integer.valueOf(7))
+ .jsonPath("$.user").isEqualTo("nerd")
+ .jsonPath("$.text").isEqualTo("Hello world!");
+ });
+ }
+
+ @BeforeEach
+ void setUp()
+ {
+ Integer port = haproxy.getMappedPort(8400);
+ webTestClient = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build();
+
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> webTestClient
+ .get()
+ .uri("/actuator/health")
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody().jsonPath("$.status").isEqualTo("UP"));
+ }
+
+
+ WebTestClient webTestClient;
+ Network network = Network.newNetwork();
@Container
- GenericContainer HAPROXY =
+ GenericContainer haproxy =
new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
.withNetwork(network)
.withNetworkAliases("haproxy")
"haproxy.cfg",
"/usr/local/etc/haproxy/haproxy.cfg",
BindMode.READ_ONLY)
- .withExposedPorts(8400, 8401, 8404, 5555)
+ .withClasspathResourceMapping(
+ "sharding.map",
+ "/usr/local/etc/haproxy/sharding.map",
+ BindMode.READ_WRITE)
+ .withExposedPorts(8400) // , 8401, 8404, 5555)
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
@Container
- KafkaContainer KAFKA =
+ KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withNetwork(network)
.withNetworkAliases("kafka")
.withListener(() -> "kafka:9999")
+ .withEnv("KAFKA_NUM_PARTITIONS", "10")
.withKraft()
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
@Container
- GenericContainer BACKEND_1 =
+ GenericContainer backend_1 =
new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
.withImagePullPolicy(NEVER_PULL)
.withNetwork(network)
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-1"));
@Container
- GenericContainer BACKEND_2 =
+ GenericContainer backend_2 =
new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
.withImagePullPolicy(NEVER_PULL)
.withNetwork(network)
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-2"));
@Container
- GenericContainer BACKEND_3 =
+ GenericContainer backend_3 =
new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
.withImagePullPolicy(NEVER_PULL)
.withNetwork(network)
"--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
)
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("BACKEND-3"));
-
-
- @BeforeAll
- static void setUp()
- {
- }
-
- @Test
- void test() throws InterruptedException
- {
- Thread.sleep(150000);
- }
}