4f855b8c4d3b1eed7430896f9c049e9acbee79fb
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.EqualsAndHashCode;
4 import lombok.Getter;
5 import lombok.ToString;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10
11 import java.time.Clock;
12 import java.time.LocalDateTime;
13 import java.util.*;
14 import java.util.regex.Matcher;
15 import java.util.regex.Pattern;
16
17
18 @Slf4j
19 @EqualsAndHashCode(of = { "id" })
20 @ToString(of = { "id", "name" })
21 public class ChatRoom
22 {
23   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
24   @Getter
25   private final UUID id;
26   @Getter
27   private final String name;
28   @Getter
29   private final int shard;
30   private final Clock clock;
31   private final ChatRoomService service;
32   private final int bufferSize;
33   private Sinks.Many<Message> sink;
34
35
36   public ChatRoom(
37       UUID id,
38       String name,
39       int shard,
40       Clock clock,
41       ChatRoomService service,
42       int bufferSize)
43   {
44     log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
45     this.id = id;
46     this.name = name;
47     this.shard = shard;
48     this.clock = clock;
49     this.service = service;
50     this.bufferSize = bufferSize;
51     // @RequiredArgsConstructor unfortunately not possible, because
52     // the `bufferSize` is not set, if `createSink()` is called
53     // from the variable declaration!
54     this.sink = createSink();
55   }
56
57
58   synchronized public Mono<Message> addMessage(
59       Long id,
60       String user,
61       String text)
62   {
63     Matcher matcher = VALID_USER.matcher(user);
64     if (!matcher.matches())
65       throw new InvalidUsernameException(user);
66
67     Message.MessageKey key = Message.MessageKey.of(user, id);
68     return service
69         .getMessage(key)
70         .flatMap(existing -> text.equals(existing.getMessageText())
71             ? Mono.just(existing)
72             : Mono.error(() -> new MessageMutationException(existing, text)))
73         .switchIfEmpty(
74             Mono
75                 .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text))
76                 .doOnNext(m ->
77                 {
78                   Sinks.EmitResult result = sink.tryEmitNext(m);
79                   if (result.isFailure())
80                   {
81                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
82                   }
83                 }));
84   }
85
86
87   public Mono<Message> getMessage(String username, Long messageId)
88   {
89     Message.MessageKey key = Message.MessageKey.of(username, messageId);
90     return service.getMessage(key);
91   }
92
93   synchronized public Flux<Message> listen()
94   {
95     return sink
96         .asFlux()
97         .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
98   }
99
100   public Flux<Message> getMessages()
101   {
102     return getMessages(0, Long.MAX_VALUE);
103   }
104
105   public Flux<Message> getMessages(long first, long last)
106   {
107     return service.getMessages(first, last);
108   }
109
110   private Sinks.Many<Message> createSink()
111   {
112     return Sinks
113         .many()
114         .multicast()
115         .onBackpressureBuffer(bufferSize);
116   }
117 }