refactor: Only `LocalJsonFileStorageStrategy` restores `Chatroom`s
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.Getter;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7 import reactor.core.publisher.Sinks;
8
9 import java.time.LocalDateTime;
10 import java.util.*;
11
12
13 @Slf4j
14 public class Chatroom
15 {
16   @Getter
17   private final UUID id;
18   @Getter
19   private final String name;
20   private final PersistenceStrategy persistence;
21   private final int bufferSize;
22   private Sinks.Many<Message> sink;
23
24   public Chatroom(
25       UUID id,
26       String name,
27       PersistenceStrategy persistence,
28       int bufferSize)
29   {
30     this.id = id;
31     this.name = name;
32     this.persistence = persistence;
33     this.bufferSize = bufferSize;
34     this.sink = createSink();
35   }
36
37
38   synchronized public Mono<Message> addMessage(
39       Long id,
40       LocalDateTime timestamp,
41       String user,
42       String text)
43   {
44     return persistence
45         .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
46         .doOnNext(message ->
47         {
48           Sinks.EmitResult result = sink.tryEmitNext(message);
49           if (result.isFailure())
50           {
51             log.warn("Emitting of message failed with {} for {}", result.name(), message);
52           }
53         });
54   }
55
56
57   public Mono<Message> getMessage(String username, Long messageId)
58   {
59     return persistence.getMessage(Message.MessageKey.of(username, messageId));
60   }
61
62   synchronized public Flux<Message> listen()
63   {
64     return sink
65         .asFlux()
66         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
67   }
68
69   public Flux<Message> getMessages()
70   {
71     return getMessages(0, Long.MAX_VALUE);
72   }
73
74   public Flux<Message> getMessages(long first, long last)
75   {
76     return persistence.getMessages(first, last);
77   }
78
79   private Sinks.Many<Message> createSink()
80   {
81     return Sinks
82         .many()
83         .multicast()
84         .onBackpressureBuffer(bufferSize);
85   }
86 }