1 package de.juplo.kafka.chat.backend.domain;
4 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
11 import java.time.LocalDateTime;
15 @RequiredArgsConstructor
20 private final UUID id;
22 private final String name;
23 private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
24 private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
26 synchronized public Mono<Message> addMessage(
28 LocalDateTime timestamp,
32 return persistMessage(id, timestamp, user, text)
33 .doOnNext(message -> sink.tryEmitNext(message).orThrow());
36 private Mono<Message> persistMessage(
38 LocalDateTime timestamp,
42 Message message = new Message(id, (long)messages.size(), timestamp, user, text);
44 MessageKey key = new MessageKey(user, id);
45 Message existing = messages.get(key);
48 log.info("Message with key {} already exists; {}", key, existing);
49 if (!message.equals(existing))
50 throw new MessageMutationException(message, existing);
54 messages.put(key, message);
56 .fromSupplier(() -> message)
60 public Mono<Message> getMessage(String username, Long messageId)
62 return Mono.fromSupplier(() ->
64 MessageKey key = MessageKey.of(username, messageId);
65 return messages.get(key);
69 public Flux<Message> listen()
74 public Flux<Message> getMessages(long first, long last)
76 return Flux.fromStream(messages
81 long serial = message.getSerialNumber();
82 return serial >= first && serial <= last;
87 @Value(staticConstructor = "of")
88 static class MessageKey