1 package de.juplo.kafka.chatroom.domain;
4 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
11 import java.time.LocalDateTime;
13 import java.util.stream.Stream;
16 @RequiredArgsConstructor
21 private final UUID id;
23 private final String name;
24 private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
25 private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
27 synchronized public Mono<Message> addMessage(
29 LocalDateTime timestamp,
33 return persistMessage(id, timestamp, user, text)
34 .doOnNext(message -> sink.tryEmitNext(message).orThrow());
37 private Mono<Message> persistMessage(
39 LocalDateTime timestamp,
43 Message message = new Message(id, (long)messages.size(), timestamp, user, text);
45 MessageKey key = new MessageKey(user, id);
46 Message existing = messages.get(key);
49 log.info("Message with key {} already exists; {}", key, existing);
50 if (!message.equals(existing))
51 throw new IllegalArgumentException("Messages are imutable!");
55 messages.put(key, message);
57 .fromSupplier(() -> message)
61 public Mono<Message> getMessage(String username, Long messageId)
63 return Mono.fromSupplier(() ->
65 MessageKey key = MessageKey.of(username, messageId);
66 return messages.get(key);
70 public Flux<Message> listen()
75 public Stream<Message> getMessages(long firstMessage)
80 .filter(message -> message.getSerialNumber() >= firstMessage);
84 @Value(staticConstructor = "of")
85 static class MessageKey