import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.time.Clock;
import java.time.LocalDateTime;
-import java.util.Set;
import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@SpringBootTest(properties = {
"spring.main.allow-bean-definition-overriding=true",
- "chat.backend.inmemory.sharding-strategy=kafkalike",
- "chat.backend.inmemory.num-shards=10",
- "chat.backend.inmemory.owned-shards=6",
})
@AutoConfigureWebTestClient
@Slf4j
{
@Autowired
ChatBackendProperties properties;
- @Autowired
- ShardingStrategy shardingStrategy;
@MockBean
- InMemoryChatHomeService chatHomeService;
+ ChatHome chatHome;
@MockBean
ChatRoomService chatRoomService;
void testUnknownChatroomExceptionForListChatroom(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForOwnedShard();
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- when(chatHomeService.getOwnedShards()).thenReturn(new int[] { 6 });
+ UUID chatroomId = UUID.randomUUID();
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
void testUnknownChatroomExceptionForGetChatroom(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForOwnedShard();
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- when(chatHomeService.getOwnedShards()).thenReturn(new int[] { 6 });
+ UUID chatroomId = UUID.randomUUID();
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
void testUnknownChatroomExceptionForPutMessage(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForOwnedShard();
+ UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- when(chatHomeService.getOwnedShards()).thenReturn(new int[] { 6 });
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
void testUnknownChatroomExceptionForGetMessage(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForOwnedShard();
+ UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- when(chatHomeService.getOwnedShards()).thenReturn(new int[] { 6 });
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
void testUnknownChatroomExceptionForListenChatroom(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForOwnedShard();
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- when(chatHomeService.getOwnedShards()).thenReturn(new int[] { 6 });
+ UUID chatroomId = UUID.randomUUID();
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
void testMessageMutationException(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForOwnedShard();
+ UUID chatroomId = UUID.randomUUID();
String user = "foo";
Long messageId = 66l;
Message.MessageKey key = Message.MessageKey.of(user, messageId);
0,
Clock.systemDefaultZone(),
chatRoomService, 8);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
+ when(chatHome.getChatRoom(eq(chatroomId))).thenReturn(Mono.just(chatRoom));
Message existingMessage = new Message(
key,
serialNumberExistingMessage,
void testInvalidUsernameException(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForOwnedShard();
+ UUID chatroomId = UUID.randomUUID();
String user = "Foo";
Long messageId = 66l;
Message.MessageKey key = Message.MessageKey.of(user, messageId);
0,
Clock.systemDefaultZone(),
chatRoomService, 8);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class)))
+ when(chatHome.getChatRoom(any(UUID.class)))
.thenReturn(Mono.just(chatRoom));
when(chatRoomService.getMessage(any(Message.MessageKey.class)))
.thenReturn(Mono.empty());
void testShardNotOwnedExceptionForGetChatroom(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForForeignShard();
+ UUID chatroomId = UUID.randomUUID();
+ int shard = 666;
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
.exchange();
// Then
- assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId));
+ assertProblemDetailsForShardNotOwnedException(responseSpec, shard);
}
@Test
void testShardNotOwnedExceptionForListChatroom(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForForeignShard();
+ UUID chatroomId = UUID.randomUUID();
+ int shard = 666;
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
.exchange();
// Then
- assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId));
+ assertProblemDetailsForShardNotOwnedException(responseSpec, shard);
}
@Test
- @DisplayName("Assert expected problem-details for now owned shard on PUT /put/{chatroomId}/{username}/{messageId}")
+ @DisplayName("Assert expected problem-details for not owned shard on PUT /put/{chatroomId}/{username}/{messageId}")
void testShardNotOwnedExceptionForPutMessage(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForForeignShard();
+ UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ int shard = 666;
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
.exchange();
// Then
- assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId));
+ assertProblemDetailsForShardNotOwnedException(responseSpec, shard);
}
@Test
void testShardNotOwnedExceptionForGetMessage(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForForeignShard();
+ UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ int shard = 666;
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
.exchange();
// Then
- assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId));
+ assertProblemDetailsForShardNotOwnedException(responseSpec, shard);
}
@Test
void testShardNotOwnedExceptionForListenChatroom(@Autowired WebTestClient client)
{
// Given
- UUID chatroomId = getRandomIdForForeignShard();
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ UUID chatroomId = UUID.randomUUID();
+ int shard = 666;
+ when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
.exchange();
// Then
- assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId));
+ assertProblemDetailsForShardNotOwnedException(responseSpec, shard);
}
private void assertProblemDetailsForShardNotOwnedException(
.jsonPath("$.type").isEqualTo("/problem/shard-not-owned")
.jsonPath("$.shard").isEqualTo(shard);
}
-
- private UUID getRandomIdForOwnedShard()
- {
- Set<Integer> ownedShards = ownedShards();
- UUID randomId;
-
- do
- {
- randomId = UUID.randomUUID();
- }
- while (!ownedShards.contains(shardingStrategy.selectShard(randomId)));
-
- return randomId;
- }
-
- private UUID getRandomIdForForeignShard()
- {
- Set<Integer> ownedShards = ownedShards();
- UUID randomId;
-
- do
- {
- randomId = UUID.randomUUID();
- }
- while (ownedShards.contains(shardingStrategy.selectShard(randomId)));
-
- return randomId;
- }
-
- private Set<Integer> ownedShards()
- {
- return IntStream
- .of(properties.getInmemory().getOwnedShards())
- .mapToObj(shard -> Integer.valueOf(shard))
- .collect(Collectors.toSet());
- }
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+
+public class ShardedChatHomeTest extends ChatHomeWithShardsTestBase
+{
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ ShardedChatHome chatHome(
+ Integer numShards,
+ int[] ownedShards,
+ InMemoryChatHomeService chatHomeService)
+ {
+ SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+
+ IntStream
+ .of(ownedShards)
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
+
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+
+ return new ShardedChatHome(chatHomes, strategy);
+ }
+
+ @Bean
+ InMemoryChatHomeService chatHomeService(
+ Integer numShards,
+ int[] ownedShards,
+ StorageStrategy storageStrategy)
+ {
+ return new InMemoryChatHomeService(
+ numShards,
+ ownedShards,
+ storageStrategy.read());
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy(Integer numShards)
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ Clock.systemDefaultZone(),
+ 8,
+ new KafkaLikeShardingStrategy(numShards),
+ messageFlux -> new InMemoryChatRoomService(messageFlux),
+ new ObjectMapper());
+ }
+
+ @Bean
+ Integer numShards()
+ {
+ return 10;
+ }
+
+ @Bean
+ int[] ownedShards()
+ {
+ return new int[] { 2 };
+ }
+ }
+}
package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeTestBase;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
import java.time.Clock;
-import java.util.UUID;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-public class SimpleChatHomeTest
+public class SimpleChatHomeTest extends ChatHomeTestBase
{
- @Test
- @DisplayName("Assert chatroom is delivered, if it exists")
- void testGetExistingChatroom()
+ @TestConfiguration
+ static class Configuration
{
- // Given
- InMemoryChatHomeService chatHomeService = mock(InMemoryChatHomeService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
- Clock.systemDefaultZone(),
- mock(ChatRoomService.class),
- 8);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
- // When
- Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
-
- // Then
- assertThat(mono).emitsExactly(chatRoom);
- }
-
- @Test
- @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
- void testGetNonExistentChatroom()
- {
- // Given
- InMemoryChatHomeService chatHomeService = mock(InMemoryChatHomeService.class);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
- // When
- Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
-
- // Then
- assertThat(mono).sendsError();
+ @Bean
+ SimpleChatHome chatHome(InMemoryChatHomeService chatHomeService)
+ {
+ return new SimpleChatHome(chatHomeService);
+ }
+
+ @Bean
+ InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy)
+ {
+ return new InMemoryChatHomeService(
+ 1,
+ new int[] { 0 },
+ storageStrategy.read());
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy()
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ Clock.systemDefaultZone(),
+ 8,
+ chatRoomId -> 0,
+ messageFlux -> new InMemoryChatRoomService(messageFlux),
+ new ObjectMapper());
+ }
}
}