refactor: `ChatRoomService.persistMessage(..)` returns a `Mono<Message>`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.EqualsAndHashCode;
4 import lombok.Getter;
5 import lombok.ToString;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10 import reactor.core.publisher.SynchronousSink;
11
12 import java.time.Clock;
13 import java.time.LocalDateTime;
14 import java.util.*;
15 import java.util.regex.Matcher;
16 import java.util.regex.Pattern;
17
18
19 @Slf4j
20 @EqualsAndHashCode(of = { "id" })
21 @ToString(of = { "id", "name" })
22 public class ChatRoom
23 {
24   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
25   @Getter
26   private final UUID id;
27   @Getter
28   private final String name;
29   @Getter
30   private final int shard;
31   private final Clock clock;
32   private final ChatRoomService service;
33   private final int bufferSize;
34   private Sinks.Many<Message> sink;
35
36
37   public ChatRoom(
38       UUID id,
39       String name,
40       int shard,
41       Clock clock,
42       ChatRoomService service,
43       int bufferSize)
44   {
45     log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
46     this.id = id;
47     this.name = name;
48     this.shard = shard;
49     this.clock = clock;
50     this.service = service;
51     this.bufferSize = bufferSize;
52     // @RequiredArgsConstructor unfortunately not possible, because
53     // the `bufferSize` is not set, if `createSink()` is called
54     // from the variable declaration!
55     this.sink = createSink();
56   }
57
58
59   synchronized public Mono<Message> addMessage(
60       Long id,
61       String user,
62       String text)
63   {
64     Matcher matcher = VALID_USER.matcher(user);
65     if (!matcher.matches())
66       throw new InvalidUsernameException(user);
67
68     Message.MessageKey key = Message.MessageKey.of(user, id);
69     return service
70         .getMessage(key)
71         .handle((Message existing, SynchronousSink<Message> sink) ->
72         {
73           if (existing.getMessageText().equals(text))
74           {
75             sink.next(existing);
76           }
77           else
78           {
79             sink.error(new MessageMutationException(existing, text));
80           }
81         })
82         .switchIfEmpty(
83             Mono
84                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
85                 .doOnNext(m ->
86                 {
87                   Sinks.EmitResult result = sink.tryEmitNext(m);
88                   if (result.isFailure())
89                   {
90                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
91                   }
92                 }));
93   }
94
95
96   public Mono<Message> getMessage(String username, Long messageId)
97   {
98     Message.MessageKey key = Message.MessageKey.of(username, messageId);
99     return service.getMessage(key);
100   }
101
102   synchronized public Flux<Message> listen()
103   {
104     return sink
105         .asFlux()
106         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
107   }
108
109   public Flux<Message> getMessages()
110   {
111     return getMessages(0, Long.MAX_VALUE);
112   }
113
114   public Flux<Message> getMessages(long first, long last)
115   {
116     return service.getMessages(first, last);
117   }
118
119   private Sinks.Many<Message> createSink()
120   {
121     return Sinks
122         .many()
123         .multicast()
124         .onBackpressureBuffer(bufferSize);
125   }
126 }