1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
12 public class InMemoryChatHomeService implements ChatHomeService
14 private final Map<UUID, ChatRoom>[] chatrooms;
15 private final Set<Integer> ownedShards;
16 private final ShardingStrategy shardingStrategy;
19 public InMemoryChatHomeService(
22 ShardingStrategy shardingStrategy,
23 Flux<ChatRoom> chatroomFlux)
25 log.debug("Creating InMemoryChatHomeService");
27 this.chatrooms = new Map[numShards];
29 this.ownedShards = Arrays
32 () -> new HashSet<>(),
33 (set, i) -> set.add(i),
34 (a, b) -> a.addAll(b));
36 this.shardingStrategy = shardingStrategy;
38 for (int shard = 0; shard < numShards; shard++)
40 chatrooms[shard] = this.ownedShards.contains(shard)
47 if (this.ownedShards.contains(chatRoom.getShard()))
53 log.info("Ignoring not owned chat-room {}", chatRoom);
58 .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
61 void putChatRoom(ChatRoom chatRoom)
63 UUID id = chatRoom.getId();
64 int shard = shardingStrategy.selectShard(id);
65 if (!ownedShards.contains(shard))
66 throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards);
67 chatrooms[shard].put(id, chatRoom);
71 public Mono<ChatRoom> getChatRoom(UUID id)
73 int shard = shardingStrategy.selectShard(id);
74 if (ownedShards.contains(shard))
76 return Mono.justOrEmpty(chatrooms[shard].get(id));
80 int[] ownedShards = new int[this.ownedShards.size()];
81 Iterator<Integer> iterator = this.ownedShards.iterator();
82 for (int i = 0; iterator.hasNext(); i++)
84 ownedShards[i] = iterator.next();
86 return Mono.error(new UnknownChatroomException(
94 public Flux<ChatRoom> getChatRooms()
97 .fromIterable(ownedShards)
98 .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));