1 package de.juplo.kafka.chat.backend.domain;
3 import lombok.extern.slf4j.Slf4j;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
6 import reactor.core.publisher.Sinks;
7 import reactor.core.publisher.SynchronousSink;
9 import java.time.Clock;
10 import java.time.LocalDateTime;
12 import java.util.regex.Matcher;
13 import java.util.regex.Pattern;
17 public class ChatRoom extends ChatRoomInfo
19 public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
20 private final Clock clock;
21 private final ChatRoomService service;
22 private final int bufferSize;
23 private Sinks.Many<Message> sink;
31 ChatRoomService service,
34 super(id, name, shard);
35 log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
37 this.service = service;
38 this.bufferSize = bufferSize;
39 // @RequiredArgsConstructor unfortunately not possible, because
40 // the `bufferSize` is not set, if `createSink()` is called
41 // from the variable declaration!
42 this.sink = createSink();
46 synchronized public Mono<Message> addMessage(
51 Matcher matcher = VALID_USER.matcher(user);
52 if (!matcher.matches())
53 throw new InvalidUsernameException(user);
55 Message.MessageKey key = Message.MessageKey.of(user, id);
58 .handle((Message existing, SynchronousSink<Message> sink) ->
60 if (existing.getMessageText().equals(text))
66 sink.error(new MessageMutationException(existing, text));
71 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
74 Sinks.EmitResult result = sink.tryEmitNext(m);
75 if (result.isFailure())
77 log.warn("Emitting of message failed with {} for {}", result.name(), m);
83 public Mono<Message> getMessage(String username, Long messageId)
85 Message.MessageKey key = Message.MessageKey.of(username, messageId);
86 return service.getMessage(key);
89 synchronized public Flux<Message> listen()
93 .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
96 public Flux<Message> getMessages()
98 return getMessages(0, Long.MAX_VALUE);
101 public Flux<Message> getMessages(long first, long last)
103 return service.getMessages(first, last);
106 private Sinks.Many<Message> createSink()
111 .onBackpressureBuffer(bufferSize);