1 package de.juplo.kafka.chat.backend.domain;
3 import org.awaitility.Awaitility;
4 import org.junit.jupiter.api.BeforeEach;
5 import org.junit.jupiter.api.DisplayName;
6 import org.junit.jupiter.api.Test;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
11 import java.util.LinkedList;
12 import java.util.List;
14 import static org.mockito.Mockito.*;
15 import static pl.rzrz.assertj.reactor.Assertions.assertThat;
18 public class ChatRoomDataTest
21 ChatMessageService chatMessageService;
22 ChatRoomData chatRoomData;
26 Message.MessageKey key;
27 LocalDateTime timestamp;
33 now = Clock.fixed(Instant.now(), ZoneId.systemDefault());
34 chatMessageService = mock(ChatMessageService.class);
35 chatRoomData = new ChatRoomData(
39 chatRoomData.activate();
43 key = Message.MessageKey.of(user, messageId);
44 timestamp = LocalDateTime.now(now);
49 @DisplayName("Assert, that Mono emits expected message, if it exists")
50 void testGetExistingMessage()
53 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
54 .thenReturn(Mono.just(someMessage()));
57 Mono<Message> mono = chatRoomData.getMessage(user, messageId);
60 assertThat(mono).emitsExactly(someMessage());
64 @DisplayName("Assert, that Mono is empty, if message does not exists")
65 void testGetNonExistentMessage()
68 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
69 .thenReturn(Mono.empty());
72 Mono<Message> mono = chatRoomData.getMessage(user, messageId);
75 assertThat(mono).emitsCount(0);
79 @DisplayName("Assert, that Mono emits the persisted message, if a new message is added")
80 void testAddNewMessageEmitsPersistedMessage()
83 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
84 .thenReturn(Mono.empty());
85 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
86 .thenReturn(Mono.just(someMessage()));
89 Mono<Message> mono = chatRoomData.addMessage(messageId, user, "Some Text");
92 assertThat(mono).emitsExactly(someMessage());
96 @DisplayName("Assert, that ChatMessageService.persistMessage() is called correctly, if a new message is added")
97 void testAddNewMessageTriggersPersistence()
100 String messageText = "Bar";
101 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
102 .thenReturn(Mono.empty());
103 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
104 .thenReturn(Mono.just(someMessage()));
108 .addMessage(messageId, user, messageText)
112 verify(chatMessageService, times(1)).persistMessage(eq(key), eq(timestamp), eq(messageText));
116 @DisplayName("Assert, that Mono emits the already persisted message, if an unchanged message is added")
117 void testAddUnchangedMessageEmitsAlreadyPersistedMessage()
120 String messageText = "Bar";
121 Message existingMessage = new Message(key, 0l, timestamp, messageText);
122 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
123 .thenReturn(Mono.just(existingMessage));
126 Mono<Message> mono = chatRoomData.addMessage(messageId, user, messageText);
129 assertThat(mono).emitsExactly(existingMessage);
133 @DisplayName("Assert, that ChatMessageService.persistMessage() is not called, if an unchanged message is added")
134 void testAddUnchangedMessageDoesNotTriggerPersistence()
137 String messageText = "Bar";
138 Message existingMessage = new Message(key, 0l, timestamp, messageText);
139 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
140 .thenReturn(Mono.just(existingMessage));
144 .addMessage(messageId, user, messageText)
148 verify(chatMessageService, never()).persistMessage(any(), any(), any());
152 @DisplayName("Assert, that Mono sends an error, if a message is added again with mutated text")
153 void testAddMutatedMessageSendsError()
156 String messageText = "Bar";
157 String mutatedText = "Boom!";
158 Message existingMessage = new Message(key, 0l, timestamp, messageText);
159 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
160 .thenReturn(Mono.just(existingMessage));
163 Mono<Message> mono = chatRoomData.addMessage(messageId, user, mutatedText);
166 assertThat(mono).sendsError();
170 @DisplayName("Assert, that ChatMessageService.persistMessage() is not called, if a message is added again with mutated text")
171 void testAddMutatedDoesNotTriggerPersistence()
174 String messageText = "Bar";
175 String mutatedText = "Boom!";
176 Message existingMessage = new Message(key, 0l, timestamp, messageText);
177 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
178 .thenReturn(Mono.just(existingMessage));
182 .addMessage(messageId, user, mutatedText)
183 .onErrorResume((throwable) -> Mono.empty())
187 verify(chatMessageService, never()).persistMessage(any(), any(), any());
191 @DisplayName("Assert, that Mono sends an error, if a message is sent to a closed chat-room")
192 void testAddMessageToClosedChatRoomSendsError()
195 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
196 .thenReturn(Mono.empty());
197 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
198 .thenReturn(Mono.just(someMessage()));
200 chatRoomData.deactivate();
203 Mono<Message> mono = chatRoomData.addMessage(messageId, user, "Some text");
206 assertThat(mono).sendsError();
210 @DisplayName("Assert, that ChatMessageService.persistMessage() is not called if a message is sent to an inactive chat-room")
211 void testAddMessageToClosedChatRoomDoesNotTriggerPersistence()
214 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
215 .thenReturn(Mono.empty());
216 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
217 .thenReturn(Mono.just(someMessage()));
219 chatRoomData.deactivate();
223 .addMessage(messageId, user, "Some text")
224 .onErrorResume((throwable) -> Mono.empty())
228 verify(chatMessageService, never()).persistMessage(any(), any(), any());
232 @DisplayName("Assert, that a listener receives a message, that was added after the listening had started")
233 void testListenerReceivesMessageAddedAfterListeningStarts()
236 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
237 .thenReturn(Mono.empty());
238 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
239 .thenReturn(Mono.just(someMessage()));
242 List<Message> receivedMessages = new LinkedList<>();
245 .subscribe(receivedMessage -> receivedMessages.add(receivedMessage));
246 Message sentMessage = chatRoomData
247 .addMessage(messageId, user, "Some Text")
253 .atMost(Duration.ofSeconds(1))
254 .untilAsserted(() -> assertThat(receivedMessages).contains(sentMessage));
258 @DisplayName("Assert, that a listener receives a message, that was added before the listening had started")
259 void testListenerReceivesMessageAddedBeforeListeningStarts()
262 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
263 .thenReturn(Mono.empty());
264 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
265 .thenReturn(Mono.just(someMessage()));
268 Message sentMessage = chatRoomData
269 .addMessage(messageId, user, "Some Text")
271 List<Message> receivedMessages = new LinkedList<>();
274 .subscribe(receivedMessage -> receivedMessages.add(receivedMessage));
279 .atMost(Duration.ofSeconds(1))
280 .untilAsserted(() -> assertThat(receivedMessages).contains(sentMessage));
284 @DisplayName("Assert, that a listener receives several messages, that were added before and after the listening had started, in correct order")
285 void testListenerReceivesMessagesFromBeforeAndAfterListeningHadStartedInCorrectOrder()
288 Message message1 = new Message(key, 1l, timestamp, "#1");
289 Message message2 = new Message(key, 2l, timestamp, "#2");
290 Message message3 = new Message(key, 3l, timestamp, "#3");
291 Message message4 = new Message(key, 4l, timestamp, "#4");
292 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
293 .thenReturn(Mono.empty());
294 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
295 .thenReturn(Mono.just(message1))
296 .thenReturn(Mono.just(message2))
297 .thenReturn(Mono.just(message3))
298 .thenReturn(Mono.just(message4));
301 Message[] sentMessages = new Message[4];
302 sentMessages[0] = chatRoomData.addMessage(messageId, user, "Some Text").block();
303 sentMessages[1] = chatRoomData.addMessage(messageId, user, "Some Text").block();
304 List<Message> receivedMessages = new LinkedList<>();
307 .subscribe(receivedMessage -> receivedMessages.add(receivedMessage));
308 sentMessages[2] = chatRoomData.addMessage(messageId, user, "Some Text").block();
309 sentMessages[3] = chatRoomData.addMessage(messageId, user, "Some Text").block();
314 .atMost(Duration.ofSeconds(1))
315 .untilAsserted(() -> assertThat(receivedMessages).contains(sentMessages));
319 @DisplayName("Assert, that multiple listeners can receive an added message")
320 void testMultipleListeners()
323 Message message1 = new Message(key, 1l, timestamp, "#1");
324 Message message2 = new Message(key, 2l, timestamp, "#2");
325 Message message3 = new Message(key, 3l, timestamp, "#3");
326 Message message4 = new Message(key, 4l, timestamp, "#4");
327 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
328 .thenReturn(Mono.empty());
329 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
330 .thenReturn(Mono.just(message1))
331 .thenReturn(Mono.just(message2))
332 .thenReturn(Mono.just(message3))
333 .thenReturn(Mono.just(message4));
336 Message[] sentMessages = new Message[4];
337 List<Message> messagesReceivedByListener1 = new LinkedList<>();
340 .subscribe(receivedMessage -> messagesReceivedByListener1.add(receivedMessage));
341 sentMessages[0] = chatRoomData.addMessage(messageId, user, "Some Text").block();
342 sentMessages[1] = chatRoomData.addMessage(messageId, user, "Some Text").block();
343 List<Message> messagesReceivedByListener2 = new LinkedList<>();
346 .subscribe(receivedMessage -> messagesReceivedByListener2.add(receivedMessage));
347 sentMessages[2] = chatRoomData.addMessage(messageId, user, "Some Text").block();
348 List<Message> messagesReceivedByListener3 = new LinkedList<>();
351 .subscribe(receivedMessage -> messagesReceivedByListener3.add(receivedMessage));
352 sentMessages[3] = chatRoomData.addMessage(messageId, user, "Some Text").block();
353 List<Message> messagesReceivedByListener4 = new LinkedList<>();
356 .subscribe(receivedMessage -> messagesReceivedByListener4.add(receivedMessage));
361 .atMost(Duration.ofSeconds(1))
364 assertThat(messagesReceivedByListener1).contains(sentMessages);
365 assertThat(messagesReceivedByListener2).contains(sentMessages);
366 assertThat(messagesReceivedByListener3).contains(sentMessages);
367 assertThat(messagesReceivedByListener4).contains(sentMessages);
372 @DisplayName("Assert, that a listended to chat-room emits completed, if it is deactivated")
373 void testListenedToChatRoomEmitsCompletedIfItIsClosed()
376 Message message1 = new Message(key, 1l, timestamp, "#1");
377 Message message2 = new Message(key, 2l, timestamp, "#2");
378 Message message3 = new Message(key, 3l, timestamp, "#3");
379 Message message4 = new Message(key, 4l, timestamp, "#4");
380 when(chatMessageService.getMessage(any(Message.MessageKey.class)))
381 .thenReturn(Mono.empty());
382 when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
383 .thenReturn(Mono.just(message1))
384 .thenReturn(Mono.just(message2))
385 .thenReturn(Mono.just(message3))
386 .thenReturn(Mono.just(message4));
388 chatRoomData.addMessage(messageId, user, "Some Text").block();
389 chatRoomData.addMessage(messageId, user, "Some Text").block();
390 chatRoomData.addMessage(messageId, user, "Some Text").block();
391 chatRoomData.addMessage(messageId, user, "Some Text").block();
394 Flux<Message> listenFlux = chatRoomData.listen();
395 chatRoomData.deactivate();
398 assertThat(listenFlux).emitsExactly(
406 @DisplayName("Assert, that a listended to chat-room emits completed, if it is closed")
407 void testListeningToClosedChatRoomSendsError()
410 chatRoomData.deactivate();
413 Flux<Message> listenFlux = chatRoomData.listen();
416 assertThat(listenFlux).sendsError();
421 * This message is used, when methods of {@link ChatMessageService} are mocked,
422 * that return a {@link Message}.
423 * The contents of the message are set to arbitrary values, in order to underline
424 * the fact, that the test can only assert, that the message that was returned
425 * by {@link ChatMessageService} is handed on by {@link ChatRoomData} correctly.
428 private Message someMessage()
431 Message.MessageKey.of("FOO", 666l),
433 LocalDateTime.of(2024, 3, 8, 12, 13, 00),
434 "Just some message...");