test: `LocalJsonFilesStorageStrategyIT` asserts, that the state is restored
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.EqualsAndHashCode;
4 import lombok.Getter;
5 import lombok.ToString;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10
11 import java.time.Clock;
12 import java.time.LocalDateTime;
13 import java.util.*;
14
15
16 @Slf4j
17 @EqualsAndHashCode(of = { "id" })
18 @ToString(of = { "id", "name" })
19 public class ChatRoom
20 {
21   @Getter
22   private final UUID id;
23   @Getter
24   private final String name;
25   private final Clock clock;
26   private final ChatRoomService service;
27   private final int bufferSize;
28   private Sinks.Many<Message> sink;
29
30   public ChatRoom(
31       UUID id,
32       String name,
33       Clock clock,
34       ChatRoomService service,
35       int bufferSize)
36   {
37     this.id = id;
38     this.name = name;
39     this.clock = clock;
40     this.service = service;
41     this.bufferSize = bufferSize;
42     this.sink = createSink();
43   }
44
45
46   synchronized public Mono<Message> addMessage(
47       Long id,
48       String user,
49       String text)
50   {
51     return service
52         .persistMessage(Message.MessageKey.of(user, id), LocalDateTime.now(clock), text)
53         .doOnNext(message ->
54         {
55           Sinks.EmitResult result = sink.tryEmitNext(message);
56           if (result.isFailure())
57           {
58             log.warn("Emitting of message failed with {} for {}", result.name(), message);
59           }
60         });
61   }
62
63
64   public Mono<Message> getMessage(String username, Long messageId)
65   {
66     Message.MessageKey key = Message.MessageKey.of(username, messageId);
67     return service.getMessage(key);
68   }
69
70   synchronized public Flux<Message> listen()
71   {
72     return sink
73         .asFlux()
74         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
75   }
76
77   public Flux<Message> getMessages()
78   {
79     return getMessages(0, Long.MAX_VALUE);
80   }
81
82   public Flux<Message> getMessages(long first, long last)
83   {
84     return service.getMessages(first, last);
85   }
86
87   private Sinks.Many<Message> createSink()
88   {
89     return Sinks
90         .many()
91         .multicast()
92         .onBackpressureBuffer(bufferSize);
93   }
94 }