refactore: Renamed `ChatroomService` to `ChatRoomService` -- Rename
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.Getter;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7 import reactor.core.publisher.Sinks;
8
9 import java.time.LocalDateTime;
10 import java.util.*;
11
12
13 @Slf4j
14 public class ChatRoom
15 {
16   @Getter
17   private final UUID id;
18   @Getter
19   private final String name;
20   private final ChatRoomService service;
21   private final int bufferSize;
22   private Sinks.Many<Message> sink;
23
24   public ChatRoom(
25       UUID id,
26       String name,
27       ChatRoomService service,
28       int bufferSize)
29   {
30     this.id = id;
31     this.name = name;
32     this.service = service;
33     this.bufferSize = bufferSize;
34     this.sink = createSink();
35   }
36
37
38   synchronized public Mono<Message> addMessage(
39       Long id,
40       LocalDateTime timestamp,
41       String user,
42       String text)
43   {
44     return service
45         .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
46         .doOnNext(message ->
47         {
48           Sinks.EmitResult result = sink.tryEmitNext(message);
49           if (result.isFailure())
50           {
51             log.warn("Emitting of message failed with {} for {}", result.name(), message);
52           }
53         });
54   }
55
56
57   public Mono<Message> getMessage(String username, Long messageId)
58   {
59     Message.MessageKey key = Message.MessageKey.of(username, messageId);
60     return service.getMessage(key);
61   }
62
63   synchronized public Flux<Message> listen()
64   {
65     return sink
66         .asFlux()
67         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
68   }
69
70   public Flux<Message> getMessages()
71   {
72     return getMessages(0, Long.MAX_VALUE);
73   }
74
75   public Flux<Message> getMessages(long first, long last)
76   {
77     return service.getMessages(first, last);
78   }
79
80   private Sinks.Many<Message> createSink()
81   {
82     return Sinks
83         .many()
84         .multicast()
85         .onBackpressureBuffer(bufferSize);
86   }
87 }