package de.juplo.kafka.chat.backend.domain;
+import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.util.UUID;
+import java.util.*;
-public interface ChatHome
+@Slf4j
+public class ChatHome implements ChatHome
{
- Mono<ChatRoom> getChatRoom(UUID id);
+ private final ChatHomeService service;
+ private final int shard;
- Flux<ChatRoom> getChatRooms();
+
+ public ChatHome(ChatHomeService service, int shard)
+ {
+ log.info("Created SimpleChatHome for shard {}", shard);
+ this.service = service;
+ this.shard = shard;
+ }
+
+ public ChatHome(ChatHomeService service)
+ {
+ this(service, 0);
+ }
+
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ return service
+ .getChatRoom(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ return service.getChatRooms(shard);
+ }
}
import reactor.core.publisher.Mono;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
- return chatHomes[selectShard(id)].getChatRoom(id);
+ int shard = selectShard(id);
+ if (ownedShards.contains(shard))
+ {
+ return chatHomes[shard].getChatRoom(id);
+ }
+ else
+ {
+ int[] ownedShards = new int[this.ownedShards.size()];
+ Iterator<Integer> iterator = this.ownedShards.iterator();
+ for (int i = 0; iterator.hasNext(); i++)
+ {
+ ownedShards[i] = iterator.next();
+ }
+ return Mono.error(new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards));
+ }
}
@Override
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@Slf4j
-public class SimpleChatHome implements ChatHome
-{
- private final ChatHomeService service;
- private final int shard;
-
-
- public SimpleChatHome(ChatHomeService service, int shard)
- {
- log.info("Created SimpleChatHome for shard {}", shard);
- this.service = service;
- this.shard = shard;
- }
-
- public SimpleChatHome(ChatHomeService service)
- {
- this(service, 0);
- }
-
-
- @Override
- public Mono<ChatRoom> getChatRoom(UUID id)
- {
- return service
- .getChatRoom(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms()
- {
- return service.getChatRooms(shard);
- }
-}
import lombok.Getter;
+import java.util.Arrays;
+import java.util.Optional;
import java.util.UUID;
+import java.util.stream.Collectors;
public class UnknownChatroomException extends RuntimeException
{
@Getter
private final UUID chatroomId;
+ @Getter
+ private final Optional<Integer> shard;
+ @Getter
+ private final Optional<int[]> ownedShards;
public UnknownChatroomException(UUID chatroomId)
{
super("Chatroom does not exist: " + chatroomId);
this.chatroomId = chatroomId;
+ this.shard = Optional.empty();
+ this.ownedShards = Optional.empty();
+ }
+
+ public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards)
+ {
+ super(
+ "Chatroom does not exist (here): " +
+ chatroomId +
+ " shard=" +
+ shard +
+ ", owned=" +
+ Arrays
+ .stream(ownedShards)
+ .mapToObj(ownedShard -> Integer.toString(ownedShard))
+ .collect(Collectors.joining(",")));
+ this.chatroomId = chatroomId;
+ this.shard = Optional.of(shard);
+ this.ownedShards = Optional.of(ownedShards);
}
}
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class InMemoryChatHomeService implements ChatHomeService
{
private final Map<UUID, ChatRoom>[] chatrooms;
+ private final Set<Integer> ownedShards;
+ private final ShardingStrategy shardingStrategy;
public InMemoryChatHomeService(
{
return Flux.fromStream(chatrooms[shard].values().stream());
}
+
+
+ private int selectShard(UUID chatroomId)
+ {
+ return shardingStrategy.selectShard(chatroomId);
+ }
}
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
matchIfMissing = true)
ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
{
- return new SimpleChatHome(chatHomeService);
+ return new ChatHome(chatHomeService);
}
@Bean
StorageStrategy storageStrategy)
{
int numShards = properties.getInmemory().getNumShards();
- SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+ ChatHome[] chatHomes = new ChatHome[numShards];
storageStrategy
.read()
.subscribe(chatRoom ->
{
int shard = chatRoom.getShard();
if (chatHomes[shard] == null)
- chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
+ chatHomes[shard] = new ChatHome(chatHomeService, shard);
});
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
return new ShardedChatHome(chatHomes, strategy);
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public class ChatHomeTest
+{
+ @Test
+ @DisplayName("Assert chatroom is delivered, if it exists")
+ void testGetExistingChatroom()
+ {
+ // Given
+ ChatHomeService chatHomeService = mock(ChatHomeService.class);
+ ChatRoom chatRoom = new ChatRoom(
+ UUID.randomUUID(),
+ "Foo",
+ 0,
+ Clock.systemDefaultZone(),
+ mock(ChatRoomService.class),
+ 8);
+ when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
+ ChatHome chatHome = new ChatHome(chatHomeService);
+
+ // When
+ Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
+
+ // Then
+ assertThat(mono).emitsExactly(chatRoom);
+ }
+
+ @Test
+ @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+ void testGetNonExistentChatroom()
+ {
+ // Given
+ ChatHomeService chatHomeService = mock(ChatHomeService.class);
+ when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ ChatHome chatHome = new ChatHome(chatHomeService);
+
+ // When
+ Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
+
+ // Then
+ assertThat(mono).sendsError();
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.UUID;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-public class SimpleChatHomeTest
-{
- @Test
- @DisplayName("Assert chatroom is delivered, if it exists")
- void testGetExistingChatroom()
- {
- // Given
- ChatHomeService chatHomeService = mock(ChatHomeService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
- Clock.systemDefaultZone(),
- mock(ChatRoomService.class),
- 8);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
- // When
- Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
-
- // Then
- assertThat(mono).emitsExactly(chatRoom);
- }
-
- @Test
- @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
- void testGetNonExistentChatroom()
- {
- // Given
- ChatHomeService chatHomeService = mock(ChatHomeService.class);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
- // When
- Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
-
- // Then
- assertThat(mono).sendsError();
- }
-}
protected void start()
{
StorageStrategyITConfig config = getConfig();
- chathome = new SimpleChatHome(config.getChatHomeService());
+ chathome = new ChatHome(config.getChatHomeService());
chatRoomFactory = config.getChatRoomFactory();
}