feat: The size buffer for listeners to a chatroom is configurable
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.Getter;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7 import reactor.core.publisher.Sinks;
8
9 import java.time.LocalDateTime;
10 import java.util.*;
11
12
13 @Slf4j
14 public class Chatroom
15 {
16   @Getter
17   private final UUID id;
18   @Getter
19   private final String name;
20   private final PersistenceStrategy persistence;
21   private final Sinks.Many<Message> sink;
22
23   public Chatroom(
24       UUID id,
25       String name,
26       PersistenceStrategy persistence,
27       int bufferSize)
28   {
29     this.id = id;
30     this.name = name;
31     this.persistence = persistence;
32     this.sink = Sinks.many().multicast().onBackpressureBuffer(bufferSize);
33   }
34
35
36   synchronized public Mono<Message> addMessage(
37       Long id,
38       LocalDateTime timestamp,
39       String user,
40       String text)
41   {
42     return persistence
43         .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
44         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
45   }
46
47
48   public Mono<Message> getMessage(String username, Long messageId)
49   {
50     return persistence.getMessage(Message.MessageKey.of(username, messageId));
51   }
52
53   public Flux<Message> listen()
54   {
55     return sink.asFlux();
56   }
57
58   public Flux<Message> getMessages()
59   {
60     return getMessages(0, Long.MAX_VALUE);
61   }
62
63   public Flux<Message> getMessages(long first, long last)
64   {
65     return persistence.getMessages(first, last);
66   }
67 }