1 package de.juplo.kafka.chat.backend.domain;
3 import lombok.EqualsAndHashCode;
5 import lombok.ToString;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
11 import java.time.Clock;
12 import java.time.LocalDateTime;
14 import java.util.regex.Matcher;
15 import java.util.regex.Pattern;
19 @EqualsAndHashCode(of = { "id" })
20 @ToString(of = { "id", "name" })
23 public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
25 private final UUID id;
27 private final String name;
29 private final int shard;
30 private final Clock clock;
31 private final ChatRoomService service;
32 private final int bufferSize;
33 private Sinks.Many<Message> sink;
41 ChatRoomService service,
44 log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
49 this.service = service;
50 this.bufferSize = bufferSize;
51 // @RequiredArgsConstructor unfortunately not possible, because
52 // the `bufferSize` is not set, if `createSink()` is called
53 // from the variable declaration!
54 this.sink = createSink();
58 synchronized public Mono<Message> addMessage(
63 Matcher matcher = VALID_USER.matcher(user);
64 if (!matcher.matches())
65 throw new InvalidUsernameException(user);
67 Message.MessageKey key = Message.MessageKey.of(user, id);
70 .flatMap(existing -> text.equals(existing.getMessageText())
72 : Mono.error(() -> new MessageMutationException(existing, text)))
75 .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text))
78 Sinks.EmitResult result = sink.tryEmitNext(m);
79 if (result.isFailure())
81 log.warn("Emitting of message failed with {} for {}", result.name(), m);
87 public Mono<Message> getMessage(String username, Long messageId)
89 Message.MessageKey key = Message.MessageKey.of(username, messageId);
90 return service.getMessage(key);
93 synchronized public Flux<Message> listen()
97 .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
100 public Flux<Message> getMessages()
102 return getMessages(0, Long.MAX_VALUE);
105 public Flux<Message> getMessages(long first, long last)
107 return service.getMessages(first, last);
110 private Sinks.Many<Message> createSink()
115 .onBackpressureBuffer(bufferSize);