1 package de.juplo.kafka.chat.backend.domain;
3 import lombok.EqualsAndHashCode;
5 import lombok.ToString;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10 import reactor.core.publisher.SynchronousSink;
12 import java.time.Clock;
13 import java.time.LocalDateTime;
15 import java.util.regex.Matcher;
16 import java.util.regex.Pattern;
20 @EqualsAndHashCode(of = { "id" })
21 @ToString(of = { "id", "name" })
24 public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
26 private final UUID id;
28 private final String name;
30 private final int shard;
31 private final Clock clock;
32 private final ChatRoomService service;
33 private final int bufferSize;
34 private Sinks.Many<Message> sink;
42 ChatRoomService service,
45 log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
50 this.service = service;
51 this.bufferSize = bufferSize;
52 // @RequiredArgsConstructor unfortunately not possible, because
53 // the `bufferSize` is not set, if `createSink()` is called
54 // from the variable declaration!
55 this.sink = createSink();
59 synchronized public Mono<Message> addMessage(
64 Matcher matcher = VALID_USER.matcher(user);
65 if (!matcher.matches())
66 throw new InvalidUsernameException(user);
68 Message.MessageKey key = Message.MessageKey.of(user, id);
71 .handle((Message existing, SynchronousSink<Message> sink) ->
73 if (existing.getMessageText().equals(text))
79 sink.error(new MessageMutationException(existing, text));
84 .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text))
87 Sinks.EmitResult result = sink.tryEmitNext(m);
88 if (result.isFailure())
90 log.warn("Emitting of message failed with {} for {}", result.name(), m);
96 public Mono<Message> getMessage(String username, Long messageId)
98 Message.MessageKey key = Message.MessageKey.of(username, messageId);
99 return service.getMessage(key);
102 synchronized public Flux<Message> listen()
106 .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
109 public Flux<Message> getMessages()
111 return getMessages(0, Long.MAX_VALUE);
114 public Flux<Message> getMessages(long first, long last)
116 return service.getMessages(first, last);
119 private Sinks.Many<Message> createSink()
124 .onBackpressureBuffer(bufferSize);