1 package de.juplo.kafka.chat.backend.domain;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8 import reactor.core.publisher.Sinks;
10 import java.time.LocalDateTime;
14 @RequiredArgsConstructor
19 private final UUID id;
21 private final String name;
22 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
23 private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
25 synchronized public Mono<Message> addMessage(
27 LocalDateTime timestamp,
31 return persistMessage(id, timestamp, user, text)
32 .doOnNext(message -> sink.tryEmitNext(message).orThrow());
35 private Mono<Message> persistMessage(
37 LocalDateTime timestamp,
41 Message.MessageKey key = Message.MessageKey.of(user, id);
42 Message message = new Message(key, (long)messages.size(), timestamp, text);
44 Message existing = messages.get(key);
47 log.info("Message with key {} already exists; {}", key, existing);
48 if (!message.equals(existing))
49 throw new MessageMutationException(message, existing);
53 messages.put(key, message);
55 .fromSupplier(() -> message)
59 public Mono<Message> getMessage(String username, Long messageId)
61 return Mono.fromSupplier(() ->
63 Message.MessageKey key = Message.MessageKey.of(username, messageId);
64 return messages.get(key);
68 public Flux<Message> listen()
73 public Flux<Message> getMessages(long first, long last)
75 return Flux.fromStream(messages
80 long serial = message.getSerialNumber();
81 return serial >= first && serial <= last;