refactor: The `ChatRoom` determines the timestamp of a `Message`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.Getter;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7 import reactor.core.publisher.Sinks;
8
9 import java.time.Clock;
10 import java.time.LocalDateTime;
11 import java.util.*;
12
13
14 @Slf4j
15 public class ChatRoom
16 {
17   @Getter
18   private final UUID id;
19   @Getter
20   private final String name;
21   private final Clock clock;
22   private final ChatRoomService service;
23   private final int bufferSize;
24   private Sinks.Many<Message> sink;
25
26   public ChatRoom(
27       UUID id,
28       String name,
29       Clock clock,
30       ChatRoomService service,
31       int bufferSize)
32   {
33     this.id = id;
34     this.name = name;
35     this.clock = clock;
36     this.service = service;
37     this.bufferSize = bufferSize;
38     this.sink = createSink();
39   }
40
41
42   synchronized public Mono<Message> addMessage(
43       Long id,
44       String user,
45       String text)
46   {
47     return service
48         .persistMessage(Message.MessageKey.of(user, id), LocalDateTime.now(clock), text)
49         .doOnNext(message ->
50         {
51           Sinks.EmitResult result = sink.tryEmitNext(message);
52           if (result.isFailure())
53           {
54             log.warn("Emitting of message failed with {} for {}", result.name(), message);
55           }
56         });
57   }
58
59
60   public Mono<Message> getMessage(String username, Long messageId)
61   {
62     Message.MessageKey key = Message.MessageKey.of(username, messageId);
63     return service.getMessage(key);
64   }
65
66   synchronized public Flux<Message> listen()
67   {
68     return sink
69         .asFlux()
70         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
71   }
72
73   public Flux<Message> getMessages()
74   {
75     return getMessages(0, Long.MAX_VALUE);
76   }
77
78   public Flux<Message> getMessages(long first, long last)
79   {
80     return service.getMessages(first, last);
81   }
82
83   private Sinks.Many<Message> createSink()
84   {
85     return Sinks
86         .many()
87         .multicast()
88         .onBackpressureBuffer(bufferSize);
89   }
90 }