WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / SimpleChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.util.*;
11
12
13 @Slf4j
14 public class SimpleChatHome implements ChatHome
15 {
16   private final Map<UUID, ChatRoom> chatrooms;
17
18
19
20   public SimpleChatHome(Flux<ChatRoom> chatroomFlux)
21   {
22     this(chatroomFlux, null);
23   }
24
25   public SimpleChatHome(
26       Integer shard,
27       Flux<ChatRoom> chatroomFlux)
28   {
29     log.info("Created SimpleChatHome for shard {}", shard);
30
31     this.chatrooms = new HashMap<>();
32     chatroomFlux
33         .filter(chatRoom ->
34         {
35           if (shard == null && chatRoom.getShard() == shard)
36           {
37             return true;
38           }
39           else
40           {
41             log.info(
42                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
43                 shard,
44                 chatRoom);
45             return false;
46           }
47         })
48         .toStream()
49         .forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
50   }
51
52
53   @Override
54   public Mono<ChatRoom> getChatRoom(UUID id)
55   {
56     return Mono
57         .justOrEmpty(chatrooms.get(id))
58         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
59   }
60
61   @Override
62   public Flux<ChatRoom> getChatRooms()
63   {
64     return Flux.fromIterable(chatrooms.values());
65   }
66 }