1 package de.juplo.kafka.chatroom.domain;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8 import reactor.core.publisher.Sinks;
10 import java.time.LocalDateTime;
11 import java.util.LinkedList;
12 import java.util.List;
13 import java.util.UUID;
14 import java.util.stream.Stream;
17 @RequiredArgsConstructor
22 private final UUID id;
24 private final String name;
25 private final List<Message> messages = new LinkedList<>();
26 private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
28 synchronized public Mono<Message> addMessage(
30 LocalDateTime timestamp,
34 return persist(id, timestamp, user, text)
35 .doOnNext(message -> sink.tryEmitNext(message).orThrow());
38 private Mono<Message> persistMessage(
40 LocalDateTime timestamp,
44 Message message = new Message(id, (long)messages.size(), timestamp, user, text);
45 messages.add(message);
47 .fromSupplier(() -> message)
51 public Flux<Message> listen()
56 public Stream<Message> getMessages(long firstMessage)
58 return messages.stream().filter(message -> message.getSerialNumber() >= firstMessage);