1 package de.juplo.kafka.chat.backend.domain;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8 import reactor.core.publisher.Sinks;
10 import java.time.LocalDateTime;
14 @RequiredArgsConstructor
19 private final UUID id;
21 private final String name;
22 private final PersistenceStrategy persistence;
23 private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
25 synchronized public Mono<Message> addMessage(
27 LocalDateTime timestamp,
32 .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
33 .doOnNext(message -> sink.tryEmitNext(message).orThrow());
37 public Mono<Message> getMessage(String username, Long messageId)
39 return persistence.getMessage(Message.MessageKey.of(username, messageId));
42 public Flux<Message> listen()
47 public Flux<Message> getMessages()
49 return getMessages(0, Long.MAX_VALUE);
52 public Flux<Message> getMessages(long first, long last)
54 return persistence.getMessages(first, last);