package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class ShardedChatHome implements ChatHome
{
- private final ChatHome[] chatHomes;
+ private final SimpleChatHome[] chatHomes;
private final Set<Integer> ownedShards;
private final ShardingStrategy shardingStrategy;
public ShardedChatHome(
- ChatHome[] chatHomes,
+ SimpleChatHome[] chatHomes,
ShardingStrategy shardingStrategy)
{
this.chatHomes = chatHomes;
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ int shard = shardingStrategy.selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard].createChatRoom(id, name);
+ }
+
+ @Override
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
int shard = selectShard(id);
return chatHomes[shard] == null
? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard].getChatRoom(id);
+ : chatHomes[shard]
+ .getChatRoomInfo(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomInfo> getChatRoomInfo()
{
return Flux
.fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRooms());
+ .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
}
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard]
+ .getChatRoomData(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRoomData());
+ }
+
+
private int selectShard(UUID chatroomId)
{