package de.juplo.kafka.chat.backend;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.core.io.Resource;
import org.springframework.http.MediaType;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.testcontainers.shaded.org.awaitility.Awaitility;
+import reactor.core.publisher.Flux;
import java.io.IOException;
import java.time.Duration;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.random.RandomGenerator;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.endsWith;
@Autowired
ObjectMapper objectMapper;
+ @Value("classpath:data/files/5c73531c-6fc4-426c-adcb-afc5c140a0f7.json")
+ Resource existingChatRoomRessource;
+ MessageTo[] expectedExistingMessages;
+
@BeforeEach
- void waitForApp()
+ void waitForApp() throws IOException
{
+ expectedExistingMessages = objectMapper
+ .readValue(
+ existingChatRoomRessource.getInputStream(),
+ new TypeReference<List<MessageTo>>() {})
+ .toArray(size -> new MessageTo[size]);
+
Awaitility
.await()
.atMost(Duration.ofSeconds(15))
.jsonPath("$.text").isEqualTo("Hello world!");
});
}
+
+ @Test
+ @DisplayName("Only newly send messages can be seen, when listening to restored chat-room")
+ void testListenToRestoredChatRoomYieldsOnlyNewlyAddedMessages()
+ {
+ MessageTo sentMessage = webTestClient
+ .put()
+ .uri(
+ "http://localhost:{port}/{chatRoomId}/nerd/{messageId}",
+ port,
+ EXISTING_CHATROOM,
+ RandomGenerator.getDefault().nextInt())
+ .contentType(MediaType.TEXT_PLAIN)
+ .accept(MediaType.APPLICATION_JSON)
+ .bodyValue("Hello world!")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .returnResult(MessageTo.class)
+ .getResponseBody()
+ .next()
+ .block();
+
+ Flux<MessageTo> result = webTestClient
+ .get()
+ .uri(
+ "http://localhost:{port}/{chatRoomId}/listen",
+ port,
+ EXISTING_CHATROOM)
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .exchange()
+ .expectStatus().isOk()
+ .returnResult(MessageTo.class)
+ .getResponseBody();
+
+ assertThat(result.next().block()).isEqualTo(sentMessage);
+ }
}
package de.juplo.kafka.chat.backend.domain;
-import org.junit.jupiter.api.BeforeEach;
import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.*;
now,
chatMessageService,
8);
+ chatRoomData.activate();
user = "foo";
messageId = 1l;
verify(chatMessageService, never()).persistMessage(any(), any(), any());
}
+ @Test
+ @DisplayName("Assert, that Mono sends an error, if a message is sent to an inactive chat-room")
+ void testAddMessageToInactiveChatRoomSendsError()
+ {
+ // Given
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(someMessage()));
+
+ chatRoomData.deactivate();
+
+ // When
+ Mono<Message> mono = chatRoomData.addMessage(messageId, user, "Some text");
+
+ // Then
+ assertThat(mono).sendsError();
+ }
+
+ @Test
+ @DisplayName("Assert, that ChatMessageService.persistMessage() is not called if a message is sent to an inactive chat-room")
+ void testAddMessageToInactiveChatRoomDoesNotTriggerPersistence()
+ {
+ // Given
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(someMessage()));
+
+ chatRoomData.deactivate();
+
+ // When
+ chatRoomData
+ .addMessage(messageId, user, "Some text")
+ .onErrorResume((throwable) -> Mono.empty())
+ .block(Duration.ofSeconds(5));
+
+ // Then
+ verify(chatMessageService, never()).persistMessage(any(), any(), any());
+ }
+
@Test
@DisplayName("Assert, that a listener receives a message, that was added after the listening had started")
void testListenerReceivesMessageAddedAfterListeningStarts()
});
}
+ @Test
+ @DisplayName("Assert, that a listended to chat-room emits completed, if it is deactivated")
+ void testListenedToChatRoomEmitsCompletedIfItIsDeactivated()
+ {
+ // Given
+ Message message1 = new Message(key, 1l, timestamp, "#1");
+ Message message2 = new Message(key, 2l, timestamp, "#2");
+ Message message3 = new Message(key, 3l, timestamp, "#3");
+ Message message4 = new Message(key, 4l, timestamp, "#4");
+ when(chatMessageService.getMessage(any(Message.MessageKey.class)))
+ .thenReturn(Mono.empty());
+ when(chatMessageService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class)))
+ .thenReturn(Mono.just(message1))
+ .thenReturn(Mono.just(message2))
+ .thenReturn(Mono.just(message3))
+ .thenReturn(Mono.just(message4));
+
+ chatRoomData.addMessage(messageId, user, "Some Text").block();
+ chatRoomData.addMessage(messageId, user, "Some Text").block();
+ chatRoomData.addMessage(messageId, user, "Some Text").block();
+ chatRoomData.addMessage(messageId, user, "Some Text").block();
+
+ // When
+ Flux<Message> listenFlux = chatRoomData.listen();
+ chatRoomData.deactivate();
+
+ // Then
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(listenFlux).emitsExactly(message1, message2, message3, message4));
+ }
+
+ @Test
+ @DisplayName("Assert, that a listended to chat-room emits completed, if it is inactiv")
+ void testListeningToInactiveChatRoomSendsError()
+ {
+ // Given
+ chatRoomData.deactivate();
+
+ // When
+ Flux<Message> listenFlux = chatRoomData.listen();
+
+ // Then
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(listenFlux).sendsError());
+ }
+
/**
* This message is used, when methods of {@link ChatMessageService} are mocked,