refactor: Refined packaging (renamed packages and classes)
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.Getter;
4 import lombok.RequiredArgsConstructor;
5 import lombok.Value;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10
11 import java.time.LocalDateTime;
12 import java.util.*;
13 import java.util.stream.Stream;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 public class Chatroom
19 {
20   @Getter
21   private final UUID id;
22   @Getter
23   private final String name;
24   private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
25   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
26
27   synchronized public Mono<Message> addMessage(
28       Long id,
29       LocalDateTime timestamp,
30       String user,
31       String text)
32   {
33     return persistMessage(id, timestamp, user, text)
34         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
35   }
36
37   private Mono<Message> persistMessage(
38       Long id,
39       LocalDateTime timestamp,
40       String user,
41       String text)
42   {
43     Message message = new Message(id, (long)messages.size(), timestamp, user, text);
44
45     MessageKey key = new MessageKey(user, id);
46     Message existing = messages.get(key);
47     if (existing != null)
48     {
49       log.info("Message with key {} already exists; {}", key, existing);
50       if (!message.equals(existing))
51         throw new MessageMutationException(message, existing);
52       return Mono.empty();
53     }
54
55     messages.put(key, message);
56     return Mono
57         .fromSupplier(() -> message)
58         .log();
59   }
60
61   public Mono<Message> getMessage(String username, Long messageId)
62   {
63     return Mono.fromSupplier(() ->
64     {
65       MessageKey key = MessageKey.of(username, messageId);
66       return messages.get(key);
67     });
68   }
69
70   public Flux<Message> listen()
71   {
72     return sink.asFlux();
73   }
74
75   public Stream<Message> getMessages(long firstMessage)
76   {
77     return messages
78         .values()
79         .stream()
80         .filter(message -> message.getSerialNumber() >= firstMessage);
81   }
82
83
84   @Value(staticConstructor = "of")
85   static class MessageKey
86   {
87     String username;
88     Long messageId;
89   }
90 }