1 package de.juplo.kafka.chat.backend.domain;
3 import lombok.extern.slf4j.Slf4j;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
6 import reactor.core.publisher.Sinks;
7 import reactor.core.publisher.SynchronousSink;
9 import java.time.Clock;
10 import java.time.LocalDateTime;
11 import java.util.regex.Matcher;
12 import java.util.regex.Pattern;
18 public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
20 private final Clock clock;
21 private final ChatRoomService service;
22 private final int bufferSize;
23 private Sinks.Many<Message> sink;
28 ChatRoomService service,
31 log.info("Created ChatRoom with buffer-size {}", bufferSize);
33 this.service = service;
34 this.bufferSize = bufferSize;
35 // @RequiredArgsConstructor unfortunately not possible, because
36 // the `bufferSize` is not set, if `createSink()` is called
37 // from the variable declaration!
38 this.sink = createSink();
42 synchronized public Mono<Message> addMessage(
47 Matcher matcher = VALID_USER.matcher(user);
48 if (!matcher.matches())
49 throw new InvalidUsernameException(user);
51 Message.MessageKey key = Message.MessageKey.of(user, id);
54 .handle((Message existing, SynchronousSink<Message> sink) ->
56 if (existing.getMessageText().equals(text))
62 sink.error(new MessageMutationException(existing, text));
67 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
70 Sinks.EmitResult result = sink.tryEmitNext(m);
71 if (result.isFailure())
73 log.warn("Emitting of message failed with {} for {}", result.name(), m);
79 public ChatRoomService getChatRoomService()
84 public Mono<Message> getMessage(String username, Long messageId)
86 Message.MessageKey key = Message.MessageKey.of(username, messageId);
87 return service.getMessage(key);
90 synchronized public Flux<Message> listen()
94 .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
97 public Flux<Message> getMessages()
99 return getMessages(0, Long.MAX_VALUE);
102 public Flux<Message> getMessages(long first, long last)
104 return service.getMessages(first, last);
107 private Sinks.Many<Message> createSink()
112 .onBackpressureBuffer(bufferSize);