c2d25b201750af9638c667ab2fb18d0fb72a8ea2
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / SimpleChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7
8 import java.time.Clock;
9 import java.util.*;
10
11
12 @Slf4j
13 public class SimpleChatHome implements ChatHome
14 {
15   private final Integer shard;
16   private final Map<UUID, ChatRoom> chatRooms;
17   private final Clock clock;
18   private final int bufferSize;
19
20
21
22   public SimpleChatHome(
23       Flux<ChatRoom> chatroomFlux,
24       Clock clock,
25       int bufferSize)
26   {
27     this(null, chatroomFlux, clock, bufferSize);
28   }
29
30   public SimpleChatHome(
31       Integer shard,
32       Flux<ChatRoom> chatroomFlux,
33       Clock clock,
34       int bufferSize)
35   {
36     log.info("Created SimpleChatHome for shard {}", shard);
37 ;
38     this.shard = shard;
39     this.chatRooms = new HashMap<>();
40     chatroomFlux
41         .filter(chatRoom ->
42         {
43           if (shard == null || chatRoom.getShard() == shard)
44           {
45             return true;
46           }
47           else
48           {
49             log.info(
50                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
51                 shard,
52                 chatRoom);
53             return false;
54           }
55         })
56         .toStream()
57         .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
58     this.clock = clock;
59     this.bufferSize = bufferSize;
60   }
61
62
63   @Override
64   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
65   {
66     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
67     ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
68     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
69     chatRooms.put(id, chatRoom);
70     return Mono.just(chatRoom);
71   }
72
73   @Override
74   public Mono<ChatRoom> getChatRoom(UUID id)
75   {
76     return Mono
77         .justOrEmpty(chatRooms.get(id))
78         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
79   }
80
81   @Override
82   public Flux<ChatRoom> getChatRooms()
83   {
84     return Flux.fromIterable(chatRooms.values());
85   }
86 }