NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.extern.slf4j.Slf4j;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
6 import reactor.core.publisher.Sinks;
7 import reactor.core.publisher.SynchronousSink;
8
9 import java.time.Clock;
10 import java.time.LocalDateTime;
11 import java.util.*;
12 import java.util.regex.Matcher;
13 import java.util.regex.Pattern;
14
15
16 @Slf4j
17 public class ChatRoom extends ChatRoomInfo
18 {
19   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
20   private final Clock clock;
21   private final ChatRoomService service;
22   private final int bufferSize;
23   private Sinks.Many<Message> sink;
24
25
26   public ChatRoom(
27       UUID id,
28       String name,
29       int shard,
30       Clock clock,
31       ChatRoomService service,
32       int bufferSize)
33   {
34     super(id, name, shard);
35     log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
36     this.clock = clock;
37     this.service = service;
38     this.bufferSize = bufferSize;
39     // @RequiredArgsConstructor unfortunately not possible, because
40     // the `bufferSize` is not set, if `createSink()` is called
41     // from the variable declaration!
42     this.sink = createSink();
43   }
44
45
46   synchronized public Mono<Message> addMessage(
47       Long id,
48       String user,
49       String text)
50   {
51     Matcher matcher = VALID_USER.matcher(user);
52     if (!matcher.matches())
53       throw new InvalidUsernameException(user);
54
55     Message.MessageKey key = Message.MessageKey.of(user, id);
56     return service
57         .getMessage(key)
58         .handle((Message existing, SynchronousSink<Message> sink) ->
59         {
60           if (existing.getMessageText().equals(text))
61           {
62             sink.next(existing);
63           }
64           else
65           {
66             sink.error(new MessageMutationException(existing, text));
67           }
68         })
69         .switchIfEmpty(
70             Mono
71                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
72                 .doOnNext(m ->
73                 {
74                   Sinks.EmitResult result = sink.tryEmitNext(m);
75                   if (result.isFailure())
76                   {
77                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
78                   }
79                 }));
80   }
81
82
83   public ChatRoomService getChatRoomService()
84   {
85     return service;
86   }
87
88   public Mono<Message> getMessage(String username, Long messageId)
89   {
90     Message.MessageKey key = Message.MessageKey.of(username, messageId);
91     return service.getMessage(key);
92   }
93
94   synchronized public Flux<Message> listen()
95   {
96     return sink
97         .asFlux()
98         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
99   }
100
101   public Flux<Message> getMessages()
102   {
103     return getMessages(0, Long.MAX_VALUE);
104   }
105
106   public Flux<Message> getMessages(long first, long last)
107   {
108     return service.getMessages(first, last);
109   }
110
111   private Sinks.Many<Message> createSink()
112   {
113     return Sinks
114         .many()
115         .multicast()
116         .onBackpressureBuffer(bufferSize);
117   }
118 }