import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.time.Clock;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
@Slf4j
-public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
+public class InMemoryChatHomeService implements ChatHomeService
{
- private final Map<UUID, ChatRoom> chatrooms;
- private final Clock clock;
- private final int bufferSize;
+ private final Map<UUID, ChatRoom>[] chatrooms;
public InMemoryChatHomeService(
- Flux<ChatRoom> chatroomFlux,
- Clock clock,
- int bufferSize)
+ int numShards,
+ int[] ownedShards,
+ Flux<ChatRoom> chatroomFlux)
{
- log.debug("Creating InMemoryChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
- this.chatrooms = new HashMap<>();
- chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
- this.clock = clock;
- this.bufferSize = bufferSize;
+ log.debug("Creating InMemoryChatHomeService");
+ this.chatrooms = new Map[numShards];
+ Set<Integer> owned = Arrays
+ .stream(ownedShards)
+ .collect(
+ () -> new HashSet<>(),
+ (set, i) -> set.add(i),
+ (a, b) -> a.addAll(b));
+ for (int shard = 0; shard < numShards; shard++)
+ {
+ chatrooms[shard] = owned.contains(shard)
+ ? new HashMap<>()
+ : null;
+ }
+ chatroomFlux
+ .filter(chatRoom ->
+ {
+ if (owned.contains(chatRoom.getShard()))
+ {
+ return true;
+ }
+ else
+ {
+ log.info("Ignoring not owned chat-room {}", chatRoom);
+ return false;
+ }
+ })
+ .toStream()
+ .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
}
- @Override
- public Mono<ChatRoom> createChatRoom(UUID id, String name)
+ public void putChatRoom(ChatRoom chatRoom)
{
- InMemoryChatRoomService service =
- new InMemoryChatRoomService(new LinkedHashMap<>());
- ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize);
- chatrooms.put(chatRoom.getId(), chatRoom);
- return Mono.just(chatRoom);
+ chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
- return Mono.justOrEmpty(chatrooms.get(id));
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoom> getChatRooms(int shard)
{
- return Flux.fromStream(chatrooms.values().stream());
+ return Flux.fromStream(chatrooms[shard].values().stream());
}
}