refactor: A `Message` now explicitly has a `MessageKey`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.Getter;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8 import reactor.core.publisher.Sinks;
9
10 import java.time.LocalDateTime;
11 import java.util.*;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class Chatroom
17 {
18   @Getter
19   private final UUID id;
20   @Getter
21   private final String name;
22   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
23   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
24
25   synchronized public Mono<Message> addMessage(
26       Long id,
27       LocalDateTime timestamp,
28       String user,
29       String text)
30   {
31     return persistMessage(id, timestamp, user, text)
32         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
33   }
34
35   private Mono<Message> persistMessage(
36       Long id,
37       LocalDateTime timestamp,
38       String user,
39       String text)
40   {
41     Message.MessageKey key = Message.MessageKey.of(user, id);
42     Message message = new Message(key, (long)messages.size(), timestamp, text);
43
44     Message existing = messages.get(key);
45     if (existing != null)
46     {
47       log.info("Message with key {} already exists; {}", key, existing);
48       if (!message.equals(existing))
49         throw new MessageMutationException(message, existing);
50       return Mono.empty();
51     }
52
53     messages.put(key, message);
54     return Mono
55         .fromSupplier(() -> message)
56         .log();
57   }
58
59   public Mono<Message> getMessage(String username, Long messageId)
60   {
61     return Mono.fromSupplier(() ->
62     {
63       Message.MessageKey key = Message.MessageKey.of(username, messageId);
64       return messages.get(key);
65     });
66   }
67
68   public Flux<Message> listen()
69   {
70     return sink.asFlux();
71   }
72
73   public Flux<Message> getMessages(long first, long last)
74   {
75     return Flux.fromStream(messages
76         .values()
77         .stream()
78         .filter(message ->
79         {
80           long serial = message.getSerialNumber();
81           return serial >= first && serial <= last;
82         }));
83   }
84 }