1 package de.juplo.kafka.chat.backend.domain;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7 import reactor.core.publisher.Sinks;
9 import java.time.LocalDateTime;
17 private final UUID id;
19 private final String name;
20 private final ChatroomService chatroomService;
21 private final int bufferSize;
22 private Sinks.Many<Message> sink;
27 ChatroomService chatroomService,
32 this.chatroomService = chatroomService;
33 this.bufferSize = bufferSize;
34 this.sink = createSink();
38 synchronized public Mono<Message> addMessage(
40 LocalDateTime timestamp,
44 return chatroomService
45 .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
48 Sinks.EmitResult result = sink.tryEmitNext(message);
49 if (result.isFailure())
51 log.warn("Emitting of message failed with {} for {}", result.name(), message);
57 public Mono<Message> getMessage(String username, Long messageId)
59 Message.MessageKey key = Message.MessageKey.of(username, messageId);
60 return chatroomService.getMessage(key);
63 synchronized public Flux<Message> listen()
67 .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
70 public Flux<Message> getMessages()
72 return getMessages(0, Long.MAX_VALUE);
75 public Flux<Message> getMessages(long first, long last)
77 return chatroomService.getMessages(first, last);
80 private Sinks.Many<Message> createSink()
85 .onBackpressureBuffer(bufferSize);