refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoomData.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException;
4 import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8 import reactor.core.publisher.Sinks;
9 import reactor.core.publisher.SynchronousSink;
10
11 import java.time.Clock;
12 import java.time.LocalDateTime;
13 import java.util.regex.Matcher;
14 import java.util.regex.Pattern;
15
16
17 @Slf4j
18 public class ChatRoomData
19 {
20   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
21
22   private final ChatRoomService service;
23   private final Clock clock;
24   private final int bufferSize;
25   private Sinks.Many<Message> sink;
26
27
28   public ChatRoomData(
29       Clock clock,
30       ChatRoomService service,
31       int bufferSize)
32   {
33     log.info("Created ChatRoom with buffer-size {}", bufferSize);
34     this.clock = clock;
35     this.service = service;
36     this.bufferSize = bufferSize;
37     // @RequiredArgsConstructor unfortunately not possible, because
38     // the `bufferSize` is not set, if `createSink()` is called
39     // from the variable declaration!
40     this.sink = createSink();
41   }
42
43
44   synchronized public Mono<Message> addMessage(
45       Long id,
46       String user,
47       String text)
48   {
49     Matcher matcher = VALID_USER.matcher(user);
50     if (!matcher.matches())
51       throw new InvalidUsernameException(user);
52
53     Message.MessageKey key = Message.MessageKey.of(user, id);
54     return service
55         .getMessage(key)
56         .handle((Message existing, SynchronousSink<Message> sink) ->
57         {
58           if (existing.getMessageText().equals(text))
59           {
60             sink.next(existing);
61           }
62           else
63           {
64             sink.error(new MessageMutationException(existing, text));
65           }
66         })
67         .switchIfEmpty(
68             Mono
69                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
70                 .doOnNext(m ->
71                 {
72                   Sinks.EmitResult result = sink.tryEmitNext(m);
73                   if (result.isFailure())
74                   {
75                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
76                   }
77                 }));
78   }
79
80
81   public ChatRoomService getChatRoomService()
82   {
83     return service;
84   }
85
86   public Mono<Message> getMessage(String username, Long messageId)
87   {
88     Message.MessageKey key = Message.MessageKey.of(username, messageId);
89     return service.getMessage(key);
90   }
91
92   synchronized public Flux<Message> listen()
93   {
94     return sink
95         .asFlux()
96         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
97   }
98
99   public Flux<Message> getMessages()
100   {
101     return getMessages(0, Long.MAX_VALUE);
102   }
103
104   public Flux<Message> getMessages(long first, long last)
105   {
106     return service.getMessages(first, last);
107   }
108
109   private Sinks.Many<Message> createSink()
110   {
111     return Sinks
112         .many()
113         .multicast()
114         .onBackpressureBuffer(bufferSize);
115   }
116 }