511b9ade69a07c85e96abb8cba10481ee0631f39
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoomData.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.extern.slf4j.Slf4j;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
6 import reactor.core.publisher.Sinks;
7 import reactor.core.publisher.SynchronousSink;
8
9 import java.time.Clock;
10 import java.time.LocalDateTime;
11 import java.util.regex.Matcher;
12 import java.util.regex.Pattern;
13
14
15 @Slf4j
16 public class ChatRoomData
17 {
18   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
19
20   private final ChatRoomService service;
21   private final Clock clock;
22   private final int bufferSize;
23   private Sinks.Many<Message> sink;
24
25
26   public ChatRoomData(
27       Clock clock,
28       ChatRoomService service,
29       int bufferSize)
30   {
31     log.info("Created ChatRoom with buffer-size {}", bufferSize);
32     this.clock = clock;
33     this.service = service;
34     this.bufferSize = bufferSize;
35     // @RequiredArgsConstructor unfortunately not possible, because
36     // the `bufferSize` is not set, if `createSink()` is called
37     // from the variable declaration!
38     this.sink = createSink();
39   }
40
41
42   synchronized public Mono<Message> addMessage(
43       Long id,
44       String user,
45       String text)
46   {
47     Matcher matcher = VALID_USER.matcher(user);
48     if (!matcher.matches())
49       throw new InvalidUsernameException(user);
50
51     Message.MessageKey key = Message.MessageKey.of(user, id);
52     return service
53         .getMessage(key)
54         .handle((Message existing, SynchronousSink<Message> sink) ->
55         {
56           if (existing.getMessageText().equals(text))
57           {
58             sink.next(existing);
59           }
60           else
61           {
62             sink.error(new MessageMutationException(existing, text));
63           }
64         })
65         .switchIfEmpty(
66             Mono
67                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
68                 .doOnNext(m ->
69                 {
70                   Sinks.EmitResult result = sink.tryEmitNext(m);
71                   if (result.isFailure())
72                   {
73                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
74                   }
75                 }));
76   }
77
78
79   public ChatRoomService getChatRoomService()
80   {
81     return service;
82   }
83
84   public Mono<Message> getMessage(String username, Long messageId)
85   {
86     Message.MessageKey key = Message.MessageKey.of(username, messageId);
87     return service.getMessage(key);
88   }
89
90   synchronized public Flux<Message> listen()
91   {
92     return sink
93         .asFlux()
94         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
95   }
96
97   public Flux<Message> getMessages()
98   {
99     return getMessages(0, Long.MAX_VALUE);
100   }
101
102   public Flux<Message> getMessages(long first, long last)
103   {
104     return service.getMessages(first, last);
105   }
106
107   private Sinks.Many<Message> createSink()
108   {
109     return Sinks
110         .many()
111         .multicast()
112         .onBackpressureBuffer(bufferSize);
113   }
114 }