--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+@Slf4j
+public abstract class AbstractApplicationStartupIT
+{
+ @Autowired
+ WebTestClient webTestClient;
+
+
+ @Test
+ @DisplayName("Applications starts when no data is available (fresh install)")
+ void testStartup() throws IOException
+ {
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> webTestClient
+ .get()
+ .uri("/actuator/health")
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody().jsonPath("$.status").isEqualTo("UP"));
+ }
+
+ @Test
+ @DisplayName("Chat-rooms can be listed and returns an empty list")
+ void testListChatRooms() throws IOException
+ {
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() ->
+ {
+ Flux<ChatRoomInfoTo> result = webTestClient
+ .get()
+ .uri("/list")
+ .accept(MediaType.APPLICATION_JSON)
+ .exchange()
+ .expectStatus().isOk()
+ .returnResult(ChatRoomInfoTo.class)
+ .getResponseBody()
+ .doOnNext(chatRoomInfoTo ->
+ {
+ log.debug("Found chat-room {}", chatRoomInfoTo);
+ });
+
+ assertThat(result).emitsExactly();
+ });
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.inmemory.storage-strategy=files",
+ "chat.backend.inmemory.sharding-strategy=kafkalike",
+ "chat.backend.inmemory.num-shards=10",
+ "chat.backend.inmemory.owned-shards=2" })
+class InMemoryWithFilesAndShardingApplicationStartupIT extends AbstractApplicationStartupIT
+{
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.inmemory.sharding-strategy=none",
+ "chat.backend.inmemory.storage-strategy=files" })
+class InMemoryWithFilesApplicationStartupIT extends AbstractApplicationStartupIT
+{
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "spring.data.mongodb.host=localhost",
+ "spring.data.mongodb.database=test",
+ "chat.backend.inmemory.sharding-strategy=none",
+ "chat.backend.inmemory.storage-strategy=mongodb" })
+@Testcontainers
+@Slf4j
+class InMemoryWithMongoDbApplicationStartupIT extends AbstractApplicationStartupIT
+{
+ private static final int MONGODB_PORT = 27017;
+
+ @Container
+ private static final GenericContainer CONTAINER =
+ new GenericContainer("mongo:6").withExposedPorts(MONGODB_PORT);
+
+ @DynamicPropertySource
+ static void addMongoPortProperty(DynamicPropertyRegistry registry)
+ {
+ registry.add("spring.data.mongodb.port", () -> CONTAINER.getMappedPort(MONGODB_PORT));
+ }
+
+ @BeforeEach
+ void setUpLogging()
+ {
+ Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
+ CONTAINER.followOutput(logConsumer);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.inmemory.storage-strategy=none",
+ "chat.backend.inmemory.sharding-strategy=kafkalike",
+ "chat.backend.inmemory.num-shards=10",
+ "chat.backend.inmemory.owned-shards=2" })
+class InMemoryWithNoStorageAndShardingApplicationStartupIT extends AbstractApplicationStartupIT
+{
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import static de.juplo.kafka.chat.backend.KafkaApplicationStartupIT.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaApplicationStartupIT.INFO_TOPIC;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "spring.main.allow-bean-definition-overriding=true",
+ "chat.backend.services=kafka",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
+ "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+ "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
+ "chat.backend.kafka.num-partitions=10",
+ })
+@EmbeddedKafka(
+ topics = { INFO_TOPIC, DATA_TOPIC },
+ partitions = 10)
+@Slf4j
+class KafkaApplicationStartupIT extends AbstractApplicationStartupIT
+{
+ final static String INFO_TOPIC = "KAFKA_APPLICATION_STARTUP_IT_INFO_CHANNEL";
+ final static String DATA_TOPIC = "KAFKA_APPLICATION_STARTUP_IT_DATA_CHANNEL";
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.List;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.implementation.kafka.InfoChannelTest.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.InfoChannelTest.INFO_TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringJUnitConfig(classes = {
+ KafkaAutoConfiguration.class,
+ TaskExecutionAutoConfiguration.class,
+ KafkaServicesConfiguration.class,
+ InfoChannelTest.InfoChannelTestConfiguration.class
+})
+@EnableConfigurationProperties(ChatBackendProperties.class)
+@TestPropertySource(properties = {
+ "chat.backend.services=kafka",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
+ "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+ "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
+ "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
+})
+@EmbeddedKafka(
+ topics = { INFO_TOPIC, DATA_TOPIC },
+ partitions = NUM_SHARDS)
+@Slf4j
+public class InfoChannelTest
+{
+ final static String INFO_TOPIC = "INFO_CHANNEL_TEST_INFO";
+ final static String DATA_TOPIC = "INFO_CHANNEL_TEST_DATA";
+
+
+ @Autowired
+ InfoChannel infoChannel;
+
+
+ @Test
+ @DisplayName("The loading completes, if the topic is empty")
+ void testLoadingCompletesIfTopicEmpty()
+ {
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(infoChannel.getChannelState()).isEqualTo(ChannelState.READY));
+ }
+
+
+ static class InfoChannelTestConfiguration
+ {
+ @Bean
+ WorkAssignor infoChannelWorkAssignor()
+ {
+ return consumer ->
+ {
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(INFO_TOPIC)
+ .stream()
+ .map(partitionInfo -> new TopicPartition(INFO_TOPIC, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
+ }
+
+ @Bean
+ WorkAssignor dataChannelWorkAssignor()
+ {
+ return consumer -> log.info("No work is assigned to the DataChannel!");
+ }
+
+ @Bean
+ ObjectMapper objectMapper()
+ {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new JavaTimeModule());
+ return objectMapper;
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+ }
+}