refactor: Fixed return-types of the controller
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.Getter;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8 import reactor.core.publisher.Sinks;
9
10 import java.time.LocalDateTime;
11 import java.util.*;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class Chatroom
17 {
18   @Getter
19   private final UUID id;
20   @Getter
21   private final String name;
22   private final PersistenceStrategy persistence;
23   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
24
25   synchronized public Mono<Message> addMessage(
26       Long id,
27       LocalDateTime timestamp,
28       String user,
29       String text)
30   {
31     return persistence
32         .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
33         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
34   }
35
36
37   public Mono<Message> getMessage(String username, Long messageId)
38   {
39     return persistence.getMessage(Message.MessageKey.of(username, messageId));
40   }
41
42   public Flux<Message> listen()
43   {
44     return sink.asFlux();
45   }
46
47   public Flux<Message> getMessages()
48   {
49     return getMessages(0, Long.MAX_VALUE);
50   }
51
52   public Flux<Message> getMessages(long first, long last)
53   {
54     return persistence.getMessages(first, last);
55   }
56 }