refactor: Moved business-logic from `ChatRoomService` into `ChatRoom`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.EqualsAndHashCode;
4 import lombok.Getter;
5 import lombok.ToString;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10
11 import java.time.Clock;
12 import java.time.LocalDateTime;
13 import java.util.*;
14
15
16 @Slf4j
17 @EqualsAndHashCode(of = { "id" })
18 @ToString(of = { "id", "name" })
19 public class ChatRoom
20 {
21   @Getter
22   private final UUID id;
23   @Getter
24   private final String name;
25   private final Clock clock;
26   private final ChatRoomService service;
27   private final int bufferSize;
28   private Sinks.Many<Message> sink;
29
30   public ChatRoom(
31       UUID id,
32       String name,
33       Clock clock,
34       ChatRoomService service,
35       int bufferSize)
36   {
37     this.id = id;
38     this.name = name;
39     this.clock = clock;
40     this.service = service;
41     this.bufferSize = bufferSize;
42     this.sink = createSink();
43   }
44
45
46   synchronized public Mono<Message> addMessage(
47       Long id,
48       String user,
49       String text)
50   {
51     Message.MessageKey key = Message.MessageKey.of(user, id);
52     return service
53         .getMessage(key)
54         .flatMap(existing -> text.equals(existing.getMessageText())
55             ? Mono.just(existing)
56             : Mono.error(() -> new MessageMutationException(existing, text)))
57         .switchIfEmpty(
58             Mono
59                 .just(service.persistMessage(key, LocalDateTime.now(clock), text))
60                 .doOnNext(m ->
61                 {
62                   Sinks.EmitResult result = sink.tryEmitNext(m);
63                   if (result.isFailure())
64                   {
65                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
66                   }
67                 }));
68   }
69
70
71   public Mono<Message> getMessage(String username, Long messageId)
72   {
73     Message.MessageKey key = Message.MessageKey.of(username, messageId);
74     return service.getMessage(key);
75   }
76
77   synchronized public Flux<Message> listen()
78   {
79     return sink
80         .asFlux()
81         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
82   }
83
84   public Flux<Message> getMessages()
85   {
86     return getMessages(0, Long.MAX_VALUE);
87   }
88
89   public Flux<Message> getMessages(long first, long last)
90   {
91     return service.getMessages(first, last);
92   }
93
94   private Sinks.Many<Message> createSink()
95   {
96     return Sinks
97         .many()
98         .multicast()
99         .onBackpressureBuffer(bufferSize);
100   }
101 }