f87a088350c229f80df55cfe7121c7519170abb4
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chatroom / domain / Chatroom.java
1 package de.juplo.kafka.chatroom.domain;
2
3 import lombok.Getter;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8 import reactor.core.publisher.Sinks;
9
10 import java.time.LocalDateTime;
11 import java.util.LinkedList;
12 import java.util.List;
13 import java.util.UUID;
14 import java.util.stream.Stream;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 public class Chatroom
20 {
21   @Getter
22   private final UUID id;
23   @Getter
24   private final String name;
25   private final List<Message> messages = new LinkedList<>();
26   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
27
28   synchronized public Mono<Message> addMessage(
29       UUID id,
30       LocalDateTime timestamp,
31       String user,
32       String text)
33   {
34     return persistMessage(id, timestamp, user, text)
35         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
36   }
37
38   private Mono<Message> persistMessage(
39       UUID id,
40       LocalDateTime timestamp,
41       String user,
42       String text)
43   {
44     Message message = new Message(id, (long)messages.size(), timestamp, user, text);
45     messages.add(message);
46     return Mono
47         .fromSupplier(() -> message)
48         .log();
49   }
50
51   public Flux<Message> listen()
52   {
53     return sink.asFlux();
54   }
55
56   public Stream<Message> getMessages(long firstMessage)
57   {
58     return messages.stream().filter(message -> message.getSerialNumber() >= firstMessage);
59   }
60 }