package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
@Slf4j
-public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
+public class InMemoryChatHomeService implements ChatHomeService
{
private final Map<UUID, ChatRoom>[] chatrooms;
+ private final Set<Integer> ownedShards;
+ private final ShardingStrategy shardingStrategy;
- public InMemoryChatHomeService(int numShards, Flux<ChatRoom> chatroomFlux)
+ public InMemoryChatHomeService(
+ int numShards,
+ int[] ownedShards,
+ ShardingStrategy shardingStrategy,
+ Flux<ChatRoom> chatroomFlux)
{
log.debug("Creating InMemoryChatHomeService");
+
this.chatrooms = new Map[numShards];
+
+ this.ownedShards = Arrays
+ .stream(ownedShards)
+ .collect(
+ () -> new HashSet<>(),
+ (set, i) -> set.add(i),
+ (a, b) -> a.addAll(b));
+
+ this.shardingStrategy = shardingStrategy;
+
for (int shard = 0; shard < numShards; shard++)
- chatrooms[shard] = new HashMap<>();
+ {
+ chatrooms[shard] = this.ownedShards.contains(shard)
+ ? new HashMap<>()
+ : null;
+ }
chatroomFlux
+ .filter(chatRoom ->
+ {
+ if (this.ownedShards.contains(chatRoom.getShard()))
+ {
+ return true;
+ }
+ else
+ {
+ log.info("Ignoring not owned chat-room {}", chatRoom);
+ return false;
+ }
+ })
.toStream()
.forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
}
- @Override
- public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
+ void putChatRoom(ChatRoom chatRoom)
{
- chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
- return Mono.just(chatRoom);
+ UUID id = chatRoom.getId();
+ int shard = shardingStrategy.selectShard(id);
+ if (!ownedShards.contains(shard))
+ throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards);
+ chatrooms[shard].put(id, chatRoom);
}
@Override
- public Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ public Mono<ChatRoom> getChatRoom(UUID id)
{
- return Mono.justOrEmpty(chatrooms[shard].get(id));
+ int shard = shardingStrategy.selectShard(id);
+ if (ownedShards.contains(shard))
+ {
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
+ }
+ else
+ {
+ int[] ownedShards = new int[this.ownedShards.size()];
+ Iterator<Integer> iterator = this.ownedShards.iterator();
+ for (int i = 0; iterator.hasNext(); i++)
+ {
+ ownedShards[i] = iterator.next();
+ }
+ return Mono.error(new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards));
+ }
}
@Override
- public Flux<ChatRoom> getChatRooms(int shard)
+ public Flux<ChatRoom> getChatRooms()
{
- return Flux.fromStream(chatrooms[shard].values().stream());
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
}
}