1 package de.juplo.kafka.chat.backend.domain;
3 import lombok.EqualsAndHashCode;
5 import lombok.ToString;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
11 import java.time.Clock;
12 import java.time.LocalDateTime;
17 @EqualsAndHashCode(of = { "id" })
18 @ToString(of = { "id", "name" })
22 private final UUID id;
24 private final String name;
25 private final Clock clock;
26 private final ChatRoomService service;
27 private final int bufferSize;
28 private Sinks.Many<Message> sink;
34 ChatRoomService service,
40 this.service = service;
41 this.bufferSize = bufferSize;
42 this.sink = createSink();
46 synchronized public Mono<Message> addMessage(
52 .persistMessage(Message.MessageKey.of(user, id), LocalDateTime.now(clock), text)
55 Sinks.EmitResult result = sink.tryEmitNext(message);
56 if (result.isFailure())
58 log.warn("Emitting of message failed with {} for {}", result.name(), message);
64 public Mono<Message> getMessage(String username, Long messageId)
66 Message.MessageKey key = Message.MessageKey.of(username, messageId);
67 return service.getMessage(key);
70 synchronized public Flux<Message> listen()
74 .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
77 public Flux<Message> getMessages()
79 return getMessages(0, Long.MAX_VALUE);
82 public Flux<Message> getMessages(long first, long last)
84 return service.getMessages(first, last);
87 private Sinks.Many<Message> createSink()
92 .onBackpressureBuffer(bufferSize);