fix: Added constraints for valid usernames
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.EqualsAndHashCode;
4 import lombok.Getter;
5 import lombok.ToString;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10
11 import java.time.Clock;
12 import java.time.LocalDateTime;
13 import java.util.*;
14 import java.util.regex.Matcher;
15 import java.util.regex.Pattern;
16
17
18 @Slf4j
19 @EqualsAndHashCode(of = { "id" })
20 @ToString(of = { "id", "name" })
21 public class ChatRoom
22 {
23   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
24   @Getter
25   private final UUID id;
26   @Getter
27   private final String name;
28   private final Clock clock;
29   private final ChatRoomService service;
30   private final int bufferSize;
31   private Sinks.Many<Message> sink;
32
33   public ChatRoom(
34       UUID id,
35       String name,
36       Clock clock,
37       ChatRoomService service,
38       int bufferSize)
39   {
40     this.id = id;
41     this.name = name;
42     this.clock = clock;
43     this.service = service;
44     this.bufferSize = bufferSize;
45     this.sink = createSink();
46   }
47
48
49   synchronized public Mono<Message> addMessage(
50       Long id,
51       String user,
52       String text)
53   {
54     Matcher matcher = VALID_USER.matcher(user);
55     if (!matcher.matches())
56       throw new InvalidUsernameException(user);
57
58     Message.MessageKey key = Message.MessageKey.of(user, id);
59     return service
60         .getMessage(key)
61         .flatMap(existing -> text.equals(existing.getMessageText())
62             ? Mono.just(existing)
63             : Mono.error(() -> new MessageMutationException(existing, text)))
64         .switchIfEmpty(
65             Mono
66                 .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text))
67                 .doOnNext(m ->
68                 {
69                   Sinks.EmitResult result = sink.tryEmitNext(m);
70                   if (result.isFailure())
71                   {
72                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
73                   }
74                 }));
75   }
76
77
78   public Mono<Message> getMessage(String username, Long messageId)
79   {
80     Message.MessageKey key = Message.MessageKey.of(username, messageId);
81     return service.getMessage(key);
82   }
83
84   synchronized public Flux<Message> listen()
85   {
86     return sink
87         .asFlux()
88         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
89   }
90
91   public Flux<Message> getMessages()
92   {
93     return getMessages(0, Long.MAX_VALUE);
94   }
95
96   public Flux<Message> getMessages(long first, long last)
97   {
98     return service.getMessages(first, last);
99   }
100
101   private Sinks.Many<Message> createSink()
102   {
103     return Sinks
104         .many()
105         .multicast()
106         .onBackpressureBuffer(bufferSize);
107   }
108 }