+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.core.io.Resource;
-import org.springframework.http.MediaType;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.web.reactive.server.WebTestClient;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import reactor.core.publisher.Flux;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.random.RandomGenerator;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.Matchers.endsWith;
-
-
-@Slf4j
-@DirtiesContext
-public abstract class AbstractConfigurationIT
-{
- final static String EXISTING_CHATROOM = "5c73531c-6fc4-426c-adcb-afc5c140a0f7";
- String NONEXISTENT_CHATROOM = "7f59ec77-832e-4a17-8d22-55ef46242c17";
-
-
- @Autowired
- WebTestClient webTestClient;
- @Autowired
- ObjectMapper objectMapper;
-
- @Value("classpath:data/files/5c73531c-6fc4-426c-adcb-afc5c140a0f7.json")
- Resource existingChatRoomRessource;
- MessageTo[] expectedExistingMessages;
-
-
- @BeforeEach
- void waitForApp() throws IOException
- {
- expectedExistingMessages = objectMapper
- .readValue(
- existingChatRoomRessource.getInputStream(),
- new TypeReference<List<MessageTo>>() {})
- .toArray(size -> new MessageTo[size]);
-
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
- {
- webTestClient
- .get()
- .uri("/actuator/health")
- .exchange()
- .expectStatus().isOk()
- .expectBody().jsonPath("$.status").isEqualTo("UP");
- });
- }
-
- @Test
- @DisplayName("Restored chat-rooms can be listed")
- void testRestoredChatRoomsCanBeListed()
- {
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
- {
- AtomicBoolean existingChatRoomFound = new AtomicBoolean(false);
- webTestClient
- .get()
- .uri("/list")
- .accept(MediaType.APPLICATION_JSON)
- .exchange()
- .expectStatus().isOk()
- .returnResult(ChatRoomInfoTo.class)
- .getResponseBody()
- .toIterable()
- .forEach(chatRoomInfoTo ->
- {
- log.debug("Inspecting chat-room {}", chatRoomInfoTo);
- if (chatRoomInfoTo.getId().equals(UUID.fromString(EXISTING_CHATROOM)))
- {
- log.debug("Found existing chat-room {}", chatRoomInfoTo);
- existingChatRoomFound.set(true);
- assertThat(chatRoomInfoTo.getName().equals("FOO"));
- }
- });
- assertThat(existingChatRoomFound.get()).isTrue();
- });
- }
-
- @Test
- @DisplayName("Details as expected for restored chat-room")
- void testRestoredChatRoomHasExpectedDetails()
- {
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
- {
- webTestClient
- .get()
- .uri("/{chatRoomId}", EXISTING_CHATROOM)
- .accept(MediaType.APPLICATION_JSON)
- .exchange()
- .expectStatus().isOk()
- .expectBody().jsonPath("$.name").isEqualTo("FOO");
- });
- }
-
- @Test
- @DisplayName("Restored message from Ute has expected Text")
- void testRestoredMessageForUteHasExpectedText()
- {
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
- {
- webTestClient
- .get()
- .uri("/{chatRoomId}/ute/1", EXISTING_CHATROOM)
- .accept(MediaType.APPLICATION_JSON)
- .exchange()
- .expectStatus().isOk()
- .expectBody().jsonPath("$.text").isEqualTo("Ich bin Ute...");
- });
- }
-
- @Test
- @DisplayName("Restored message from Peter has expected Text")
- void testRestoredMessageForPeterHasExpectedText()
- {
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
- {
- webTestClient
- .get()
- .uri("/{chatRoomId}/peter/1", EXISTING_CHATROOM)
- .accept(MediaType.APPLICATION_JSON)
- .exchange()
- .expectStatus().isOk()
- .expectBody().jsonPath("$.text").isEqualTo("Hallo, ich heiße Peter!");
- });
- }
-
- @Test
- @DisplayName("A PUT-message for a non-existent chat-room yields 404 NOT FOUND")
- void testNotFoundForPutMessageToNonExistentChatRoom()
- {
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
- {
- webTestClient
- .put()
- .uri("/{chatRoomId}/otto/66", NONEXISTENT_CHATROOM)
- .contentType(MediaType.TEXT_PLAIN)
- .accept(MediaType.APPLICATION_JSON)
- .bodyValue("The devil rules route 66")
- .exchange()
- .expectStatus().isNotFound()
- .expectBody()
- .jsonPath("$.type").value(endsWith("/problem/unknown-chatroom"))
- .jsonPath("$.chatroomId").isEqualTo(NONEXISTENT_CHATROOM);
- });
- }
-
- @Test
- @DisplayName("A message can be put into a newly created chat-room")
- void testPutMessageInNewChatRoom() throws IOException
- {
- ChatRoomInfoTo chatRoomInfo;
- do
- {
- // The first request creates a new chat-room
- // It must be repeated, until a chat-room was created,
- // that is owned by the instance
- chatRoomInfo = webTestClient
- .post()
- .uri("/create")
- .contentType(MediaType.TEXT_PLAIN)
- .bodyValue("bar")
- .accept(MediaType.APPLICATION_JSON)
- .exchange()
- .returnResult(ChatRoomInfoTo.class)
- .getResponseBody()
- .retry(30)
- .blockFirst();
- }
- while(!(chatRoomInfo.getShard() == null || chatRoomInfo.getShard().intValue() == 2));
-
- UUID chatRoomId = chatRoomInfo.getId();
-
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
- {
- webTestClient
- .put()
- .uri("/{chatRoomId}/nerd/7", chatRoomId)
- .contentType(MediaType.TEXT_PLAIN)
- .accept(MediaType.APPLICATION_JSON)
- .bodyValue("Hello world!")
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath("$.id").isEqualTo(Integer.valueOf(7))
- .jsonPath("$.user").isEqualTo("nerd")
- .jsonPath("$.text").isEqualTo("Hello world!");
- });
- }
-
- @Test
- @DisplayName("Only newly send messages can be seen, when listening to restored chat-room")
- void testListenToRestoredChatRoomYieldsOnlyNewlyAddedMessages()
- {
- MessageTo sentMessage = webTestClient
- .put()
- .uri(
- "/{chatRoomId}/nerd/{messageId}",
- EXISTING_CHATROOM,
- RandomGenerator.getDefault().nextInt())
- .contentType(MediaType.TEXT_PLAIN)
- .accept(MediaType.APPLICATION_JSON)
- .bodyValue("Hello world!")
- .exchange()
- .expectStatus()
- .isOk()
- .returnResult(MessageTo.class)
- .getResponseBody()
- .next()
- .block();
-
- Flux<MessageTo> result = webTestClient
- .get()
- .uri("/{chatRoomId}/listen", EXISTING_CHATROOM)
- .accept(MediaType.TEXT_EVENT_STREAM)
- .exchange()
- .expectStatus().isOk()
- .returnResult(MessageTo.class)
- .getResponseBody();
-
- assertThat(result.next().block()).isEqualTo(sentMessage);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.http.MediaType;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-
-import java.time.Duration;
-
-import static org.hamcrest.Matchers.endsWith;
-
-
-public abstract class AbstractConfigurationWithShardingIT extends AbstractConfigurationIT
-{
- @Test
- @DisplayName("A PUT-message for a not owned shard yields 404 - NOT FOUND")
- void testNotFoundForPutMessageToAChatRoomInNotOwnedShard()
- {
- String otherChatRoomId = "4e7246a6-29ae-43ea-b56f-669c3481ac19";
- int shard = 0;
-
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .untilAsserted(() ->
- webTestClient
- .put()
- .uri("/{chatRoomId}/otto/66", otherChatRoomId)
- .contentType(MediaType.TEXT_PLAIN)
- .accept(MediaType.APPLICATION_JSON)
- .bodyValue("The devil rules route 66")
- .exchange()
- .expectStatus().is5xxServerError()
- .expectBody()
- .jsonPath("$.type").value(endsWith("/problem/shard-not-owned"))
- .jsonPath("$.shard").isEqualTo(shard));
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import org.springframework.boot.test.context.SpringBootTest;
-
-
-@SpringBootTest(
- webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = {
- "chat.backend.inmemory.storage-strategy=files",
- "chat.backend.inmemory.storage-directory=target/test-classes/data/files",
- "chat.backend.inmemory.sharding-strategy=kafkalike",
- "chat.backend.inmemory.num-shards=10",
- "chat.backend.inmemory.owned-shards=2" })
-class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationWithShardingIT
-{
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import org.springframework.boot.test.context.SpringBootTest;
-
-
-@SpringBootTest(
- webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = {
- "chat.backend.inmemory.sharding-strategy=none",
- "chat.backend.inmemory.storage-strategy=files",
- "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
-class InMemoryWithFilesConfigurationIT extends AbstractConfigurationIT
-{
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.DynamicPropertyRegistry;
-import org.springframework.test.context.DynamicPropertySource;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-
-
-@SpringBootTest(
- webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = {
- "spring.data.mongodb.host=localhost",
- "spring.data.mongodb.database=test",
- "chat.backend.inmemory.sharding-strategy=none",
- "chat.backend.inmemory.storage-strategy=mongodb" })
-@Testcontainers
-@Slf4j
-class InMemoryWithMongoDbConfigurationIT extends AbstractConfigurationIT
-{
- private static final int MONGODB_PORT = 27017;
-
- @Container
- private static final GenericContainer CONTAINER =
- new GenericContainer("mongo:6")
- .withClasspathResourceMapping(
- "data/mongodb",
- "/docker-entrypoint-initdb.d",
- BindMode.READ_ONLY)
- .withExposedPorts(MONGODB_PORT);
-
- @DynamicPropertySource
- static void addMongoPortProperty(DynamicPropertyRegistry registry)
- {
- registry.add("spring.data.mongodb.port", () -> CONTAINER.getMappedPort(MONGODB_PORT));
- }
-
- @BeforeEach
- void setUpLogging()
- {
- Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
- CONTAINER.followOutput(logConsumer);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import org.junit.jupiter.api.Disabled;
-import org.springframework.boot.test.context.SpringBootTest;
-
-
-@SpringBootTest(
- webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = {
- "chat.backend.inmemory.storage-strategy=none",
- "chat.backend.inmemory.sharding-strategy=kafkalike",
- "chat.backend.inmemory.num-shards=10",
- "chat.backend.inmemory.owned-shards=2" })
-class InMemoryWithNoStorageAndShardingConfigurationIT extends AbstractConfigurationWithShardingIT
-{
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testRestoredChatRoomsCanBeListed() {}
-
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testRestoredChatRoomHasExpectedDetails() {}
-
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testRestoredMessageForUteHasExpectedText() {}
-
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testRestoredMessageForPeterHasExpectedText() {}
-
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testListenToRestoredChatRoomYieldsOnlyNewlyAddedMessages() {}
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import org.junit.jupiter.api.Disabled;
-import org.springframework.boot.test.context.SpringBootTest;
-
-
-@SpringBootTest(
- webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = {
- "chat.backend.inmemory.sharding-strategy=none",
- "chat.backend.inmemory.storage-strategy=none" })
-class InMemoryWithNoStorageConfigurationIT extends AbstractConfigurationIT
-{
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testRestoredChatRoomsCanBeListed() {}
-
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testRestoredChatRoomHasExpectedDetails() {}
-
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testRestoredMessageForUteHasExpectedText() {}
-
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testRestoredMessageForPeterHasExpectedText() {}
-
- @Override
- @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
- void testListenToRestoredChatRoomYieldsOnlyNewlyAddedMessages() {}
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor;
-import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeAll;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Import;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
-
-
-@SpringBootTest(
- webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = {
- "spring.main.allow-bean-definition-overriding=true",
- "chat.backend.services=kafka",
- "chat.backend.kafka.client-id-PREFIX=TEST",
- "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
- "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
- "chat.backend.kafka.num-partitions=10",
- })
-@EmbeddedKafka(
- topics = { INFO_TOPIC, DATA_TOPIC },
- partitions = 10)
-@Slf4j
-class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
-{
- final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
- final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
-
- @BeforeAll
- static void sendAndLoadStoredData(
- @Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
- @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
- {
- KafkaTestUtils.initKafkaSetup(
- INFO_TOPIC,
- DATA_TOPIC,
- messageTemplate,
- infoChannelTaskExecutor,
- dataChannelTaskExecutor);
- }
-
-
- @TestConfiguration
- @Import(KafkaTestUtils.KafkaTestConfiguration.class)
- static class KafkaConfigurationITConfiguration
- {
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.io.Resource;
+import org.springframework.http.MediaType;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.random.RandomGenerator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.endsWith;
+
+
+@Slf4j
+@DirtiesContext
+public abstract class AbstractConfigurationIT
+{
+ final static String EXISTING_CHATROOM = "5c73531c-6fc4-426c-adcb-afc5c140a0f7";
+ String NONEXISTENT_CHATROOM = "7f59ec77-832e-4a17-8d22-55ef46242c17";
+
+
+ @Autowired
+ WebTestClient webTestClient;
+ @Autowired
+ ObjectMapper objectMapper;
+
+ @Value("classpath:data/files/5c73531c-6fc4-426c-adcb-afc5c140a0f7.json")
+ Resource existingChatRoomRessource;
+ MessageTo[] expectedExistingMessages;
+
+
+ @BeforeEach
+ void waitForApp() throws IOException
+ {
+ expectedExistingMessages = objectMapper
+ .readValue(
+ existingChatRoomRessource.getInputStream(),
+ new TypeReference<List<MessageTo>>() {})
+ .toArray(size -> new MessageTo[size]);
+
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ webTestClient
+ .get()
+ .uri("/actuator/health")
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody().jsonPath("$.status").isEqualTo("UP");
+ });
+ }
+
+ @Test
+ @DisplayName("Restored chat-rooms can be listed")
+ void testRestoredChatRoomsCanBeListed()
+ {
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ AtomicBoolean existingChatRoomFound = new AtomicBoolean(false);
+ webTestClient
+ .get()
+ .uri("/list")
+ .accept(MediaType.APPLICATION_JSON)
+ .exchange()
+ .expectStatus().isOk()
+ .returnResult(ChatRoomInfoTo.class)
+ .getResponseBody()
+ .toIterable()
+ .forEach(chatRoomInfoTo ->
+ {
+ log.debug("Inspecting chat-room {}", chatRoomInfoTo);
+ if (chatRoomInfoTo.getId().equals(UUID.fromString(EXISTING_CHATROOM)))
+ {
+ log.debug("Found existing chat-room {}", chatRoomInfoTo);
+ existingChatRoomFound.set(true);
+ assertThat(chatRoomInfoTo.getName().equals("FOO"));
+ }
+ });
+ assertThat(existingChatRoomFound.get()).isTrue();
+ });
+ }
+
+ @Test
+ @DisplayName("Details as expected for restored chat-room")
+ void testRestoredChatRoomHasExpectedDetails()
+ {
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ webTestClient
+ .get()
+ .uri("/{chatRoomId}", EXISTING_CHATROOM)
+ .accept(MediaType.APPLICATION_JSON)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody().jsonPath("$.name").isEqualTo("FOO");
+ });
+ }
+
+ @Test
+ @DisplayName("Restored message from Ute has expected Text")
+ void testRestoredMessageForUteHasExpectedText()
+ {
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ webTestClient
+ .get()
+ .uri("/{chatRoomId}/ute/1", EXISTING_CHATROOM)
+ .accept(MediaType.APPLICATION_JSON)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody().jsonPath("$.text").isEqualTo("Ich bin Ute...");
+ });
+ }
+
+ @Test
+ @DisplayName("Restored message from Peter has expected Text")
+ void testRestoredMessageForPeterHasExpectedText()
+ {
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ webTestClient
+ .get()
+ .uri("/{chatRoomId}/peter/1", EXISTING_CHATROOM)
+ .accept(MediaType.APPLICATION_JSON)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody().jsonPath("$.text").isEqualTo("Hallo, ich heiße Peter!");
+ });
+ }
+
+ @Test
+ @DisplayName("A PUT-message for a non-existent chat-room yields 404 NOT FOUND")
+ void testNotFoundForPutMessageToNonExistentChatRoom()
+ {
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ webTestClient
+ .put()
+ .uri("/{chatRoomId}/otto/66", NONEXISTENT_CHATROOM)
+ .contentType(MediaType.TEXT_PLAIN)
+ .accept(MediaType.APPLICATION_JSON)
+ .bodyValue("The devil rules route 66")
+ .exchange()
+ .expectStatus().isNotFound()
+ .expectBody()
+ .jsonPath("$.type").value(endsWith("/problem/unknown-chatroom"))
+ .jsonPath("$.chatroomId").isEqualTo(NONEXISTENT_CHATROOM);
+ });
+ }
+
+ @Test
+ @DisplayName("A message can be put into a newly created chat-room")
+ void testPutMessageInNewChatRoom() throws IOException
+ {
+ ChatRoomInfoTo chatRoomInfo;
+ do
+ {
+ // The first request creates a new chat-room
+ // It must be repeated, until a chat-room was created,
+ // that is owned by the instance
+ chatRoomInfo = webTestClient
+ .post()
+ .uri("/create")
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue("bar")
+ .accept(MediaType.APPLICATION_JSON)
+ .exchange()
+ .returnResult(ChatRoomInfoTo.class)
+ .getResponseBody()
+ .retry(30)
+ .blockFirst();
+ }
+ while(!(chatRoomInfo.getShard() == null || chatRoomInfo.getShard().intValue() == 2));
+
+ UUID chatRoomId = chatRoomInfo.getId();
+
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ webTestClient
+ .put()
+ .uri("/{chatRoomId}/nerd/7", chatRoomId)
+ .contentType(MediaType.TEXT_PLAIN)
+ .accept(MediaType.APPLICATION_JSON)
+ .bodyValue("Hello world!")
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody()
+ .jsonPath("$.id").isEqualTo(Integer.valueOf(7))
+ .jsonPath("$.user").isEqualTo("nerd")
+ .jsonPath("$.text").isEqualTo("Hello world!");
+ });
+ }
+
+ @Test
+ @DisplayName("Only newly send messages can be seen, when listening to restored chat-room")
+ void testListenToRestoredChatRoomYieldsOnlyNewlyAddedMessages()
+ {
+ MessageTo sentMessage = webTestClient
+ .put()
+ .uri(
+ "/{chatRoomId}/nerd/{messageId}",
+ EXISTING_CHATROOM,
+ RandomGenerator.getDefault().nextInt())
+ .contentType(MediaType.TEXT_PLAIN)
+ .accept(MediaType.APPLICATION_JSON)
+ .bodyValue("Hello world!")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .returnResult(MessageTo.class)
+ .getResponseBody()
+ .next()
+ .block();
+
+ Flux<MessageTo> result = webTestClient
+ .get()
+ .uri("/{chatRoomId}/listen", EXISTING_CHATROOM)
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .exchange()
+ .expectStatus().isOk()
+ .returnResult(MessageTo.class)
+ .getResponseBody();
+
+ assertThat(result.next().block()).isEqualTo(sentMessage);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.http.MediaType;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import java.time.Duration;
+
+import static org.hamcrest.Matchers.endsWith;
+
+
+public abstract class AbstractConfigurationWithShardingIT extends AbstractConfigurationIT
+{
+ @Test
+ @DisplayName("A PUT-message for a not owned shard yields 404 - NOT FOUND")
+ void testNotFoundForPutMessageToAChatRoomInNotOwnedShard()
+ {
+ String otherChatRoomId = "4e7246a6-29ae-43ea-b56f-669c3481ac19";
+ int shard = 0;
+
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ webTestClient
+ .put()
+ .uri("/{chatRoomId}/otto/66", otherChatRoomId)
+ .contentType(MediaType.TEXT_PLAIN)
+ .accept(MediaType.APPLICATION_JSON)
+ .bodyValue("The devil rules route 66")
+ .exchange()
+ .expectStatus().is5xxServerError()
+ .expectBody()
+ .jsonPath("$.type").value(endsWith("/problem/shard-not-owned"))
+ .jsonPath("$.shard").isEqualTo(shard));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.inmemory.storage-strategy=files",
+ "chat.backend.inmemory.storage-directory=target/test-classes/data/files",
+ "chat.backend.inmemory.sharding-strategy=kafkalike",
+ "chat.backend.inmemory.num-shards=10",
+ "chat.backend.inmemory.owned-shards=2" })
+class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationWithShardingIT
+{
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.inmemory.sharding-strategy=none",
+ "chat.backend.inmemory.storage-strategy=files",
+ "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
+class InMemoryWithFilesConfigurationIT extends AbstractConfigurationIT
+{
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "spring.data.mongodb.host=localhost",
+ "spring.data.mongodb.database=test",
+ "chat.backend.inmemory.sharding-strategy=none",
+ "chat.backend.inmemory.storage-strategy=mongodb" })
+@Testcontainers
+@Slf4j
+class InMemoryWithMongoDbConfigurationIT extends AbstractConfigurationIT
+{
+ private static final int MONGODB_PORT = 27017;
+
+ @Container
+ private static final GenericContainer CONTAINER =
+ new GenericContainer("mongo:6")
+ .withClasspathResourceMapping(
+ "data/mongodb",
+ "/docker-entrypoint-initdb.d",
+ BindMode.READ_ONLY)
+ .withExposedPorts(MONGODB_PORT);
+
+ @DynamicPropertySource
+ static void addMongoPortProperty(DynamicPropertyRegistry registry)
+ {
+ registry.add("spring.data.mongodb.port", () -> CONTAINER.getMappedPort(MONGODB_PORT));
+ }
+
+ @BeforeEach
+ void setUpLogging()
+ {
+ Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
+ CONTAINER.followOutput(logConsumer);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.junit.jupiter.api.Disabled;
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.inmemory.storage-strategy=none",
+ "chat.backend.inmemory.sharding-strategy=kafkalike",
+ "chat.backend.inmemory.num-shards=10",
+ "chat.backend.inmemory.owned-shards=2" })
+class InMemoryWithNoStorageAndShardingConfigurationIT extends AbstractConfigurationWithShardingIT
+{
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testRestoredChatRoomsCanBeListed() {}
+
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testRestoredChatRoomHasExpectedDetails() {}
+
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testRestoredMessageForUteHasExpectedText() {}
+
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testRestoredMessageForPeterHasExpectedText() {}
+
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testListenToRestoredChatRoomYieldsOnlyNewlyAddedMessages() {}
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.junit.jupiter.api.Disabled;
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.inmemory.sharding-strategy=none",
+ "chat.backend.inmemory.storage-strategy=none" })
+class InMemoryWithNoStorageConfigurationIT extends AbstractConfigurationIT
+{
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testRestoredChatRoomsCanBeListed() {}
+
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testRestoredChatRoomHasExpectedDetails() {}
+
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testRestoredMessageForUteHasExpectedText() {}
+
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testRestoredMessageForPeterHasExpectedText() {}
+
+ @Override
+ @Disabled("Chat-Rooms cannot be restored, if storage is disabled")
+ void testListenToRestoredChatRoomYieldsOnlyNewlyAddedMessages() {}
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor;
+import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "spring.main.allow-bean-definition-overriding=true",
+ "chat.backend.services=kafka",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
+ "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+ "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
+ "chat.backend.kafka.num-partitions=10",
+ })
+@EmbeddedKafka(
+ topics = { INFO_TOPIC, DATA_TOPIC },
+ partitions = 10)
+@Slf4j
+class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
+{
+ final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
+ final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
+
+ @BeforeAll
+ static void sendAndLoadStoredData(
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
+ @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
+ {
+ KafkaTestUtils.initKafkaSetup(
+ INFO_TOPIC,
+ DATA_TOPIC,
+ messageTemplate,
+ infoChannelTaskExecutor,
+ dataChannelTaskExecutor);
+ }
+
+
+ @TestConfiguration
+ @Import(KafkaTestUtils.KafkaTestConfiguration.class)
+ static class KafkaConfigurationITConfiguration
+ {
+ }
+}