import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
int shard = selectShard(id);
return chatHomes[shard] == null
? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard].getChatRoom(id);
+ : chatHomes[shard]
+ .getChatRoom(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? Mono.error(new UnknownChatroomException(id, shard, ownedShards))
+ : Mono.error(throwable));
}
@Override
@Slf4j
public class SimpleChatHome implements ChatHome
{
- private final InMemoryChatHomeService service;
private final int shard;
+ private final Map<UUID, ChatRoom> chatrooms;
- public SimpleChatHome(InMemoryChatHomeService service, int shard)
+
+ public SimpleChatHome(
+ int shard,
+ Flux<ChatRoom> chatroomFlux)
{
log.info("Created SimpleChatHome for shard {}", shard);
- this.service = service;
- this.shard = shard;
- }
- public SimpleChatHome(InMemoryChatHomeService service)
- {
- this(service, 0);
+ this.chatrooms = new HashMap<>();
+ chatroomFlux
+ .filter(chatRoom ->
+ {
+ if (shard > -1 && chatRoom.getShard() == shard)
+ {
+ return true;
+ }
+ else
+ {
+ log.info(
+ "SimpleChatHome for shard {} ignores not owned chat-room {}",
+ shard,
+ chatRoom);
+ return false;
+ }
+ })
+ .toStream()
+ .forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+ this.shard = shard;
}
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
- return service
- .getChatRoom(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
- id,
- shard,
- service.getOwnedShards())));
+ return Mono
+ .justOrEmpty(chatrooms.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
@Override
public Flux<ChatRoom> getChatRooms()
{
- return service.getChatRooms(shard);
+ return Flux.fromIterable(chatrooms.values());
}
}