package de.juplo.kafka.chat.backend.domain;
-import org.junit.jupiter.api.BeforeEach;
import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
now,
chatMessageService,
8);
+ chatRoomData.activate();
user = "foo";
messageId = 1l;
verify(chatMessageService, never()).persistMessage(any(), any(), any());
}
+ @Test
+ @DisplayName("Assert, that Mono sends an error, if a message is sent to a closed chat-room")
+ void testAddMessageToClosedChatRoomSendsError()
+ {
+ // Given
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(someMessage()));
+
+ chatRoomData.deactivate();
+
+ // When
+ Mono<Message> mono = chatRoomData.addMessage(messageId, user, "Some text");
+
+ // Then
+ assertThat(mono).sendsError();
+ }
+
+ @Test
+ @DisplayName("Assert, that ChatMessageService.persistMessage() is not called if a message is sent to a closed chat-room")
+ void testAddMessageToClosedChatRoomDoesNotTriggerPersistence()
+ {
+ // Given
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(someMessage()));
+
+ chatRoomData.deactivate();
+
+ // When
+ chatRoomData
+ .addMessage(messageId, user, "Some text")
+ .onErrorResume((throwable) -> Mono.empty())
+ .block();
+
+ // Then
+ verify(chatMessageService, never()).persistMessage(any(), any(), any());
+ }
+
@Test
@DisplayName("Assert, that a listener receives a message, that was added after the listening had started")
void testListenerReceivesMessageAddedAfterListeningStarts()
});
}
+ @Test
+ @DisplayName("Assert, that a listended to chat-room emits completed, if it is closed")
+ void testListenedToChatRoomEmitsCompletedIfItIsClosed()
+ {
+ // Given
+ Message message1 = new Message(key, 1l, timestamp, "#1");
+ Message message2 = new Message(key, 2l, timestamp, "#2");
+ Message message3 = new Message(key, 3l, timestamp, "#3");
+ Message message4 = new Message(key, 4l, timestamp, "#4");
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(message1))
+ .thenReturn(Mono.just(message2))
+ .thenReturn(Mono.just(message3))
+ .thenReturn(Mono.just(message4));
+
+ chatRoomData.addMessage(messageId, user, "Some Text").block();
+ chatRoomData.addMessage(messageId, user, "Some Text").block();
+ chatRoomData.addMessage(messageId, user, "Some Text").block();
+ chatRoomData.addMessage(messageId, user, "Some Text").block();
+
+ // When
+ Flux<Message> listenFlux = chatRoomData.listen();
+ chatRoomData.deactivate();
+
+ // Then
+ assertThat(listenFlux).emitsExactly(
+ message1,
+ message2,
+ message3,
+ message4);
+ }
+
+ @Test
+ @DisplayName("Assert, that a listended to chat-room emits completed, if it is closed")
+ void testListeningToClosedChatRoomSendsError()
+ {
+ // Given
+ chatRoomData.deactivate();
+
+ // When
+ Flux<Message> listenFlux = chatRoomData.listen();
+
+ // Then
+ assertThat(listenFlux).sendsError();
+ }
+
/**
* This message is used, when methods of {@link ChatMessageService} are mocked,