1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
14 public class InMemoryChatHomeService implements ChatHomeService
16 private final ShardingStrategy shardingStrategy;
17 private final Map<UUID, ChatRoom>[] chatrooms;
20 public InMemoryChatHomeService(
21 ShardingStrategy shardingStrategy,
24 Flux<ChatRoom> chatroomFlux)
26 log.debug("Creating InMemoryChatHomeService");
27 this.shardingStrategy = shardingStrategy;
28 this.chatrooms = new Map[numShards];
29 Set<Integer> owned = Arrays
32 () -> new HashSet<>(),
33 (set, i) -> set.add(i),
34 (a, b) -> a.addAll(b));
35 for (int shard = 0; shard < numShards; shard++)
37 chatrooms[shard] = owned.contains(shard)
44 int shard = shardingStrategy.selectShard(chatRoom.getId());
45 if (owned.contains(shard))
51 log.info("Ignoring not owned chat-room {}", chatRoom);
58 getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
63 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
65 getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
66 return Mono.just(chatRoom);
70 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
72 return Mono.justOrEmpty(chatrooms[shard].get(id));
76 public Flux<ChatRoom> getChatRooms(int shard)
78 return Flux.fromStream(chatrooms[shard].values().stream());
82 private Map<UUID, ChatRoom> getChatRoomMapFor(ChatRoom chatRoom)
84 int shard = shardingStrategy.selectShard(chatRoom.getId());
85 return chatrooms[shard];