--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration;
+import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration;
+import de.juplo.kafka.chat.backend.storage.files.FilesStorageConfiguration;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+@SpringJUnitConfig(classes = {
+ InMemoryServicesConfiguration.class,
+ FilesStorageConfiguration.class,
+ KafkaServicesConfiguration.class,
+ ChatHomeServiceTest.TestConfiguration.class })
+@EnableConfigurationProperties(ChatBackendProperties.class)
+public abstract class ChatHomeServiceTest
+{
+ @Autowired
+ ChatHomeService chatHomeService;
+
+
+ @Test
+ @DisplayName("Assert chatroom is delivered, if it exists")
+ void testGetExistingChatroom()
+ {
+ // Given
+ UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+
+ // When
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
+ .log("testGetExistingChatroom")
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
+
+ // Then
+ assertThat(mono).emitsCount(1);
+ }
+
+ @Test
+ @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+ void testGetNonExistentChatroom()
+ {
+ // Given
+ UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
+
+ // When
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
+ .log("testGetNonExistentChatroom")
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
+
+ // Then
+ assertThat(mono).sendsError(e ->
+ {
+ assertThat(e).isInstanceOf(UnknownChatroomException.class);
+ UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
+ assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
+ });
+ }
+
+ static class TestConfiguration
+ {
+ @Bean
+ ObjectMapper objectMapper()
+ {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new JavaTimeModule());
+ return objectMapper;
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest
+{
+ public static final int NUM_SHARDS = 10;
+ public static final int OWNED_SHARD = 2;
+ public static final int NOT_OWNED_SHARD = 0;
+
+
+ @Test
+ @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
+ void testGetChatroomForNotOwnedShard()
+ {
+ // Given
+ UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
+
+ // When
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
+ .log("testGetChatroomForNotOwnedShard")
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
+
+ // Then
+ assertThat(mono).sendsError(e ->
+ {
+ assertThat(e).isInstanceOf(ShardNotOwnedException.class);
+ ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
+ assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
+ });
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration;
-import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration;
-import de.juplo.kafka.chat.backend.storage.files.FilesStorageConfiguration;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-@SpringJUnitConfig(classes = {
- InMemoryServicesConfiguration.class,
- FilesStorageConfiguration.class,
- KafkaServicesConfiguration.class,
- ChatHomeServiceTest.TestConfiguration.class })
-@EnableConfigurationProperties(ChatBackendProperties.class)
-public abstract class ChatHomeServiceTest
-{
- @Autowired
- ChatHomeService chatHomeService;
-
-
- @Test
- @DisplayName("Assert chatroom is delivered, if it exists")
- void testGetExistingChatroom()
- {
- // Given
- UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
-
- // When
- Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
- .log("testGetExistingChatroom")
- .retryWhen(Retry
- .backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof ChannelNotReadyException));
-
- // Then
- assertThat(mono).emitsCount(1);
- }
-
- @Test
- @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
- void testGetNonExistentChatroom()
- {
- // Given
- UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
-
- // When
- Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
- .log("testGetNonExistentChatroom")
- .retryWhen(Retry
- .backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof ChannelNotReadyException));
-
- // Then
- assertThat(mono).sendsError(e ->
- {
- assertThat(e).isInstanceOf(UnknownChatroomException.class);
- UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
- assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
- });
- }
-
- static class TestConfiguration
- {
- @Bean
- ObjectMapper objectMapper()
- {
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(new JavaTimeModule());
- return objectMapper;
- }
-
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest
-{
- public static final int NUM_SHARDS = 10;
- public static final int OWNED_SHARD = 2;
- public static final int NOT_OWNED_SHARD = 0;
-
-
- @Test
- @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
- void testGetChatroomForNotOwnedShard()
- {
- // Given
- UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
-
- // When
- Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
- .log("testGetChatroomForNotOwnedShard")
- .retryWhen(Retry
- .backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof ChannelNotReadyException));
-
- // Then
- assertThat(mono).sendsError(e ->
- {
- assertThat(e).isInstanceOf(ShardNotOwnedException.class);
- ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
- assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
- });
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
-import org.springframework.test.context.TestPropertySource;
-
-import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.OWNED_SHARD;
-
-
-@TestPropertySource(properties = {
- "chat.backend.inmemory.sharding-strategy=kafkalike",
- "chat.backend.inmemory.num-shards=" + NUM_SHARDS,
- "chat.backend.inmemory.owned-shards=" + OWNED_SHARD,
- "chat.backend.inmemory.storage-strategy=files",
- "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
-public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
-{
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
+import org.springframework.test.context.TestPropertySource;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.OWNED_SHARD;
+
+
+@TestPropertySource(properties = {
+ "chat.backend.inmemory.sharding-strategy=kafkalike",
+ "chat.backend.inmemory.num-shards=" + NUM_SHARDS,
+ "chat.backend.inmemory.owned-shards=" + OWNED_SHARD,
+ "chat.backend.inmemory.storage-strategy=files",
+ "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
+public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
+{
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
+import org.springframework.test.context.TestPropertySource;
+
+
+@TestPropertySource(properties = {
+ "chat.backend.inmemory.sharding-strategy=none",
+ "chat.backend.inmemory.storage-strategy=files",
+ "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
+public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
+{
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
-import org.springframework.test.context.TestPropertySource;
-
-
-@TestPropertySource(properties = {
- "chat.backend.inmemory.sharding-strategy=none",
- "chat.backend.inmemory.storage-strategy=files",
- "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
-public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
-{
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.jupiter.api.BeforeAll;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.TestPropertySource;
-
-import java.util.List;
-
-import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
-import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
-
-
-@ContextConfiguration(classes = {
- KafkaTestUtils.KafkaTestConfiguration.class,
- KafkaAutoConfiguration.class,
- TaskExecutionAutoConfiguration.class,
- })
-@TestPropertySource(properties = {
- "chat.backend.services=kafka",
- "chat.backend.kafka.client-id-PREFIX=TEST",
- "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
- "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
- "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
-})
-@EmbeddedKafka(
- topics = { INFO_TOPIC, DATA_TOPIC },
- partitions = NUM_SHARDS)
-@Slf4j
-public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
-{
- final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
- final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
-
-
- @BeforeAll
- static void sendAndLoadStoredData(
- @Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
- @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
- {
- KafkaTestUtils.initKafkaSetup(
- INFO_TOPIC,
- DATA_TOPIC,
- messageTemplate,
- infoChannelTaskExecutor,
- dataChannelTaskExecutor);
- }
-
-
- @TestConfiguration
- static class KafkaChatHomeServiceTestConfiguration
- {
- @Bean
- WorkAssignor infoChannelWorkAssignor()
- {
- return consumer ->
- {
- List<TopicPartition> partitions = consumer
- .partitionsFor(INFO_TOPIC)
- .stream()
- .map(partitionInfo -> new TopicPartition(INFO_TOPIC, partitionInfo.partition()))
- .toList();
- consumer.assign(partitions);
- };
- }
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
+
+import java.util.List;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
+
+
+@ContextConfiguration(classes = {
+ KafkaTestUtils.KafkaTestConfiguration.class,
+ KafkaAutoConfiguration.class,
+ TaskExecutionAutoConfiguration.class,
+ })
+@TestPropertySource(properties = {
+ "chat.backend.services=kafka",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
+ "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+ "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
+ "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
+})
+@EmbeddedKafka(
+ topics = { INFO_TOPIC, DATA_TOPIC },
+ partitions = NUM_SHARDS)
+@Slf4j
+public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
+{
+ final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
+ final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
+
+
+ @BeforeAll
+ static void sendAndLoadStoredData(
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
+ @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
+ {
+ KafkaTestUtils.initKafkaSetup(
+ INFO_TOPIC,
+ DATA_TOPIC,
+ messageTemplate,
+ infoChannelTaskExecutor,
+ dataChannelTaskExecutor);
+ }
+
+
+ @TestConfiguration
+ static class KafkaChatHomeServiceTestConfiguration
+ {
+ @Bean
+ WorkAssignor infoChannelWorkAssignor()
+ {
+ return consumer ->
+ {
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(INFO_TOPIC)
+ .stream()
+ .map(partitionInfo -> new TopicPartition(INFO_TOPIC, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
+ }
+ }
+}