--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public class ChatHomeWithShardsTestBase extends ChatHomeTestBase
+{
+ @Test
+ @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
+ void testGetChatroomForNotOwnedShard()
+ {
+ // Given
+ UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
+
+ // When
+ Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoomId);
+
+ // Then
+ assertThat(mono).sendsError(e ->
+ {
+ assertThat(e).isInstanceOf(UnknownChatroomException.class);
+ UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
+ assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
+ });
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+
+public class ShardedChatHomeTest extends ChatHomeWithShardsTestBase
+{
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ ShardedChatHome chatHome(
+ Integer numShards,
+ int[] ownedShards,
+ InMemoryChatHomeService chatHomeService)
+ {
+ SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+
+ IntStream
+ .of(ownedShards)
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
+
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+
+ return new ShardedChatHome(chatHomes, strategy);
+ }
+
+ @Bean
+ InMemoryChatHomeService chatHomeService(
+ Integer numShards,
+ int[] ownedShards,
+ StorageStrategy storageStrategy)
+ {
+ return new InMemoryChatHomeService(
+ numShards,
+ ownedShards,
+ storageStrategy.read());
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy(Integer numShards)
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ Clock.systemDefaultZone(),
+ 8,
+ new KafkaLikeShardingStrategy(numShards),
+ messageFlux -> new InMemoryChatRoomService(messageFlux),
+ new ObjectMapper());
+ }
+
+ @Bean
+ Integer numShards()
+ {
+ return 10;
+ }
+
+ @Bean
+ int[] ownedShards()
+ {
+ return new int[] { 2 };
+ }
+ }
+}