20c046d810a37231ba2a5a693db9ac7129b12aea
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoomData.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import de.juplo.kafka.chat.backend.domain.exceptions.ChatRoomInactiveException;
4 import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9 import reactor.core.publisher.Sinks;
10 import reactor.core.publisher.SynchronousSink;
11
12 import java.time.Clock;
13 import java.time.LocalDateTime;
14 import java.util.regex.Matcher;
15 import java.util.regex.Pattern;
16
17
18 @Slf4j
19 public class ChatRoomData
20 {
21   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
22
23   private final ChatMessageService service;
24   private final Clock clock;
25   private final int historyLimit;
26   private Sinks.Many<Message> sink;
27   private volatile boolean active = false;
28
29
30   public ChatRoomData(
31       Clock clock,
32       ChatMessageService service,
33       int historyLimit)
34   {
35     log.info("Created ChatRoom with history-limit {}", historyLimit);
36     this.clock = clock;
37     this.service = service;
38     this.historyLimit = historyLimit;
39     // @RequiredArgsConstructor unfortunately not possible, because
40     // the `historyLimit` is not set, if `createSink()` is called
41     // from the variable declaration!
42   }
43
44
45   synchronized public Mono<Message> addMessage(
46       Long id,
47       String user,
48       String text)
49   {
50     Matcher matcher = VALID_USER.matcher(user);
51     if (!matcher.matches())
52       throw new InvalidUsernameException(user);
53
54     Message.MessageKey key = Message.MessageKey.of(user, id);
55     return service
56         .getMessage(key)
57         .handle((Message existing, SynchronousSink<Message> sink) ->
58         {
59           if (existing.getMessageText().equals(text))
60           {
61             sink.next(existing);
62           }
63           else
64           {
65             sink.error(new MessageMutationException(existing, text));
66           }
67         })
68         .switchIfEmpty(active
69             ? Mono
70                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
71                 .doOnNext(m ->
72                 {
73                   Sinks.EmitResult result = sink.tryEmitNext(m);
74                   if (result.isFailure())
75                   {
76                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
77                   }
78                 })
79             : Mono.error(new ChatRoomInactiveException(service.getChatRoomId())));
80   }
81
82
83   public ChatMessageService getChatRoomService()
84   {
85     return service;
86   }
87
88   public Mono<Message> getMessage(String username, Long messageId)
89   {
90     Message.MessageKey key = Message.MessageKey.of(username, messageId);
91     return service.getMessage(key);
92   }
93
94   synchronized public Flux<Message> listen()
95   {
96     return active
97         ? sink
98             .asFlux()
99             .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel!
100         : Flux
101             .error(new ChatRoomInactiveException(service.getChatRoomId()));
102
103   }
104
105   public Flux<Message> getMessages()
106   {
107     return getMessages(0, Long.MAX_VALUE);
108   }
109
110   public Flux<Message> getMessages(long first, long last)
111   {
112     return service.getMessages(first, last);
113   }
114
115   public void activate()
116   {
117     if (active)
118     {
119       log.info("{} is already active!", service.getChatRoomId());
120       return;
121     }
122
123     log.info("{} is being activated", service.getChatRoomId());
124     this.sink = createSink();
125     active = true;
126   }
127
128   public void deactivate()
129   {
130     log.info("{} is being deactivated", service.getChatRoomId());
131     active = false;
132     sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
133   }
134
135   private Sinks.Many<Message> createSink()
136   {
137     return Sinks
138         .many()
139         .replay()
140         .limit(historyLimit);
141   }
142 }