package de.juplo.kafka.chat.backend.domain;
import org.junit.jupiter.api.BeforeEach;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
-import java.time.Clock;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
+import java.time.*;
+import java.util.LinkedList;
+import java.util.List;
import static org.mockito.Mockito.*;
import static pl.rzrz.assertj.reactor.Assertions.assertThat;
verify(chatMessageService, never()).persistMessage(any(), any(), any());
}
+ @Test
+ @DisplayName("Assert, that a listener receives a message, that was added after the listening had started")
+ void testListenerReceivesMessageAddedAfterListeningStarts()
+ {
+ // Given
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(someMessage()));
+
+ // When
+ List<Message> receivedMessages = new LinkedList<>();
+ chatRoomData
+ .listen()
+ .subscribe(receivedMessage -> receivedMessages.add(receivedMessage));
+ Message sentMessage = chatRoomData
+ .addMessage(messageId, user, "Some Text")
+ .block();
+
+ // Then
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(receivedMessages).contains(sentMessage));
+ }
+
+ @Test
+ @DisplayName("Assert, that a listener receives a message, that was added before the listening had started")
+ void testListenerReceivesMessageAddedBeforeListeningStarts()
+ {
+ // Given
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(someMessage()));
+
+ // When
+ Message sentMessage = chatRoomData
+ .addMessage(messageId, user, "Some Text")
+ .block();
+ List<Message> receivedMessages = new LinkedList<>();
+ chatRoomData
+ .listen()
+ .subscribe(receivedMessage -> receivedMessages.add(receivedMessage));
+
+ // Then
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(receivedMessages).contains(sentMessage));
+ }
+
+ @Test
+ @DisplayName("Assert, that a listener receives several messages, that were added before and after the listening had started, in correct order")
+ void testListenerReceivesMessagesFromBeforeAndAfterListeningHadStartedInCorrectOrder()
+ {
+ // Given
+ Message message1 = new Message(key, 1l, timestamp, "#1");
+ Message message2 = new Message(key, 2l, timestamp, "#2");
+ Message message3 = new Message(key, 3l, timestamp, "#3");
+ Message message4 = new Message(key, 4l, timestamp, "#4");
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(message1))
+ .thenReturn(Mono.just(message2))
+ .thenReturn(Mono.just(message3))
+ .thenReturn(Mono.just(message4));
+
+ // When
+ Message[] sentMessages = new Message[4];
+ sentMessages[0] = chatRoomData.addMessage(messageId, user, "Some Text").block();
+ sentMessages[1] = chatRoomData.addMessage(messageId, user, "Some Text").block();
+ List<Message> receivedMessages = new LinkedList<>();
+ chatRoomData
+ .listen()
+ .subscribe(receivedMessage -> receivedMessages.add(receivedMessage));
+ sentMessages[2] = chatRoomData.addMessage(messageId, user, "Some Text").block();
+ sentMessages[3] = chatRoomData.addMessage(messageId, user, "Some Text").block();
+
+ // Then
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(receivedMessages).contains(sentMessages));
+ }
+
/**
* This message is used, when methods of {@link ChatMessageService} are mocked,