refactor: Streamlined API of `Chatroom` - all return-types are reactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.Getter;
4 import lombok.RequiredArgsConstructor;
5 import lombok.Value;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10
11 import java.time.LocalDateTime;
12 import java.util.*;
13
14
15 @RequiredArgsConstructor
16 @Slf4j
17 public class Chatroom
18 {
19   @Getter
20   private final UUID id;
21   @Getter
22   private final String name;
23   private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
24   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
25
26   synchronized public Mono<Message> addMessage(
27       Long id,
28       LocalDateTime timestamp,
29       String user,
30       String text)
31   {
32     return persistMessage(id, timestamp, user, text)
33         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
34   }
35
36   private Mono<Message> persistMessage(
37       Long id,
38       LocalDateTime timestamp,
39       String user,
40       String text)
41   {
42     Message message = new Message(id, (long)messages.size(), timestamp, user, text);
43
44     MessageKey key = new MessageKey(user, id);
45     Message existing = messages.get(key);
46     if (existing != null)
47     {
48       log.info("Message with key {} already exists; {}", key, existing);
49       if (!message.equals(existing))
50         throw new MessageMutationException(message, existing);
51       return Mono.empty();
52     }
53
54     messages.put(key, message);
55     return Mono
56         .fromSupplier(() -> message)
57         .log();
58   }
59
60   public Mono<Message> getMessage(String username, Long messageId)
61   {
62     return Mono.fromSupplier(() ->
63     {
64       MessageKey key = MessageKey.of(username, messageId);
65       return messages.get(key);
66     });
67   }
68
69   public Flux<Message> listen()
70   {
71     return sink.asFlux();
72   }
73
74   public Flux<Message> getMessages(long first, long last)
75   {
76     return Flux.fromStream(messages
77         .values()
78         .stream()
79         .filter(message ->
80         {
81           long serial = message.getSerialNumber();
82           return serial >= first && serial <= last;
83         }));
84   }
85
86
87   @Value(staticConstructor = "of")
88   static class MessageKey
89   {
90     String username;
91     Long messageId;
92   }
93 }