--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public abstract class ChatHomeWithShardsTest extends ChatHomeTest
+{
+ public static final int NUM_SHARDS = 10;
+ public static final int OWNED_SHARD = 2;
+ public static final int NOT_OWNED_SHARD = 0;
+
+
+ @Test
+ @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
+ void testGetChatroomForNotOwnedShard()
+ {
+ // Given
+ UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
+
+ // When
+ Mono<ChatRoom> mono = Mono
+ .defer(() -> chatHome.getChatRoom(chatRoomId))
+ .log("testGetChatroomForNotOwnedShard")
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof LoadInProgressException));
+
+ // Then
+ assertThat(mono).sendsError(e ->
+ {
+ assertThat(e).isInstanceOf(ShardNotOwnedException.class);
+ ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
+ assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
+ });
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+public class ShardedChatHomeTest extends ChatHomeWithShardsTest
+{
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ ShardedChatHome chatHome(
+ InMemoryChatHomeService chatHomeService)
+ {
+ SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS];
+
+ IntStream
+ .of(ownedShards())
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
+
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
+
+ return new ShardedChatHome(chatHomes, strategy);
+ }
+
+ @Bean
+ InMemoryChatHomeService chatHomeService(
+ StorageStrategy storageStrategy)
+ {
+ return new InMemoryChatHomeService(
+ NUM_SHARDS,
+ ownedShards(),
+ storageStrategy.read());
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy()
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ Clock.systemDefaultZone(),
+ 8,
+ new KafkaLikeShardingStrategy(NUM_SHARDS),
+ messageFlux -> new InMemoryChatRoomService(messageFlux),
+ new ObjectMapper());
+ }
+
+ int[] ownedShards()
+ {
+ return new int[] { OWNED_SHARD };
+ }
+ }
+}
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS;
import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
"chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
- "chat.backend.kafka.num-partitions=10",
+ "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
})
@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
@Slf4j
-public class KafkaChatHomeTest extends ChatHomeTest
+public class KafkaChatHomeTest extends ChatHomeWithShardsTest
{
final static String TOPIC = "KAFKA_CHAT_HOME_TEST";