From: Kai Moritz Date: Sun, 3 Sep 2023 17:49:59 +0000 (+0200) Subject: refactor: Renamed `ChatRoom` into `ChatRoomData` - Moved classes X-Git-Tag: rebase--2023-09-06--23-24~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3ecd2ecb2a97ac9b4678eee63a2d5edb6d9a97ea;p=demos%2Fkafka%2Fchat refactor: Renamed `ChatRoom` into `ChatRoomData` - Moved classes --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java deleted file mode 100644 index c66b887d..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ /dev/null @@ -1,118 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.SynchronousSink; - -import java.time.Clock; -import java.time.LocalDateTime; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - - -@Slf4j -public class ChatRoom extends ChatRoomInfo -{ - public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$"); - private final Clock clock; - private final ChatRoomService service; - private final int bufferSize; - private Sinks.Many sink; - - - public ChatRoom( - UUID id, - String name, - Integer shard, - Clock clock, - ChatRoomService service, - int bufferSize) - { - super(id, name, shard); - log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); - this.clock = clock; - this.service = service; - this.bufferSize = bufferSize; - // @RequiredArgsConstructor unfortunately not possible, because - // the `bufferSize` is not set, if `createSink()` is called - // from the variable declaration! - this.sink = createSink(); - } - - - synchronized public Mono addMessage( - Long id, - String user, - String text) - { - Matcher matcher = VALID_USER.matcher(user); - if (!matcher.matches()) - throw new InvalidUsernameException(user); - - Message.MessageKey key = Message.MessageKey.of(user, id); - return service - .getMessage(key) - .handle((Message existing, SynchronousSink sink) -> - { - if (existing.getMessageText().equals(text)) - { - sink.next(existing); - } - else - { - sink.error(new MessageMutationException(existing, text)); - } - }) - .switchIfEmpty( - Mono - .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text)) - .doOnNext(m -> - { - Sinks.EmitResult result = sink.tryEmitNext(m); - if (result.isFailure()) - { - log.warn("Emitting of message failed with {} for {}", result.name(), m); - } - })); - } - - - public ChatRoomService getChatRoomService() - { - return service; - } - - public Mono getMessage(String username, Long messageId) - { - Message.MessageKey key = Message.MessageKey.of(username, messageId); - return service.getMessage(key); - } - - synchronized public Flux listen() - { - return sink - .asFlux() - .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel! - } - - public Flux getMessages() - { - return getMessages(0, Long.MAX_VALUE); - } - - public Flux getMessages(long first, long last) - { - return service.getMessages(first, last); - } - - private Sinks.Many createSink() - { - return Sinks - .many() - .multicast() - .onBackpressureBuffer(bufferSize); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java new file mode 100644 index 00000000..873e58e7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -0,0 +1,114 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.SynchronousSink; + +import java.time.Clock; +import java.time.LocalDateTime; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +@Slf4j +public class ChatRoom +{ + public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$"); + + private final Clock clock; + private final ChatRoomService service; + private final int bufferSize; + private Sinks.Many sink; + + + public ChatRoom( + Clock clock, + ChatRoomService service, + int bufferSize) + { + log.info("Created ChatRoom with buffer-size {}", bufferSize); + this.clock = clock; + this.service = service; + this.bufferSize = bufferSize; + // @RequiredArgsConstructor unfortunately not possible, because + // the `bufferSize` is not set, if `createSink()` is called + // from the variable declaration! + this.sink = createSink(); + } + + + synchronized public Mono addMessage( + Long id, + String user, + String text) + { + Matcher matcher = VALID_USER.matcher(user); + if (!matcher.matches()) + throw new InvalidUsernameException(user); + + Message.MessageKey key = Message.MessageKey.of(user, id); + return service + .getMessage(key) + .handle((Message existing, SynchronousSink sink) -> + { + if (existing.getMessageText().equals(text)) + { + sink.next(existing); + } + else + { + sink.error(new MessageMutationException(existing, text)); + } + }) + .switchIfEmpty( + Mono + .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text)) + .doOnNext(m -> + { + Sinks.EmitResult result = sink.tryEmitNext(m); + if (result.isFailure()) + { + log.warn("Emitting of message failed with {} for {}", result.name(), m); + } + })); + } + + + public ChatRoomService getChatRoomService() + { + return service; + } + + public Mono getMessage(String username, Long messageId) + { + Message.MessageKey key = Message.MessageKey.of(username, messageId); + return service.getMessage(key); + } + + synchronized public Flux listen() + { + return sink + .asFlux() + .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel! + } + + public Flux getMessages() + { + return getMessages(0, Long.MAX_VALUE); + } + + public Flux getMessages(long first, long last) + { + return service.getMessages(first, last); + } + + private Sinks.Many createSink() + { + return Sinks + .many() + .multicast() + .onBackpressureBuffer(bufferSize); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java new file mode 100644 index 00000000..822ffe77 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java @@ -0,0 +1,160 @@ +package de.juplo.kafka.chat.backend.domain; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import java.time.Clock; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.UUID; + +import static org.mockito.Mockito.*; +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +public class ChatRoomTest +{ + @Test + @DisplayName("Assert, that Mono emits expected message, if it exists") + void testGetExistingMessage() + { + // Given + String user = "foo"; + Long messageId = 1l; + ChatRoomService chatRoomService = mock(ChatRoomService.class); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); + Message.MessageKey key = Message.MessageKey.of(user, messageId); + LocalDateTime timestamp = LocalDateTime.now(); + Message message = new Message(key, 0l, timestamp, "Bar"); + when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); + + // When + Mono mono = chatRoom.getMessage(user, messageId); + + // Then + assertThat(mono).emitsExactly(message); + } + + @Test + @DisplayName("Assert, that Mono if empty, if message does not exists") + void testGetNonExistentMessage() + { + // Given + String user = "foo"; + Long messageId = 1l; + ChatRoomService chatRoomService = mock(ChatRoomService.class); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); + when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty()); + + // When + Mono mono = chatRoom.getMessage(user, messageId); + + // Then + assertThat(mono).emitsCount(0); + } + + @Test + @DisplayName("Assert, that Mono emits expected message, if a new message is added") + void testAddNewMessage() + { + // Given + String user = "foo"; + Long messageId = 1l; + ChatRoomService chatRoomService = mock(ChatRoomService.class); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); + Message.MessageKey key = Message.MessageKey.of(user, messageId); + Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + LocalDateTime timestamp = LocalDateTime.now(now); + String messageText = "Bar"; + Message message = new Message(key, 0l, timestamp, messageText); + when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty()); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); + + // When + Mono mono = chatRoom.addMessage(messageId, user, messageText); + + // Then + assertThat(mono).emitsExactly(message); + } + + @Test + @DisplayName("Assert, that Mono emits expected message, if an unchanged message is added") + void testAddUnchangedMessage() + { + // Given + String user = "foo"; + Long messageId = 1l; + ChatRoomService chatRoomService = mock(ChatRoomService.class); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); + Message.MessageKey key = Message.MessageKey.of(user, messageId); + Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + LocalDateTime timestamp = LocalDateTime.now(now); + String messageText = "Bar"; + Message message = new Message(key, 0l, timestamp, messageText); + when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); + + // When + Mono mono = chatRoom.addMessage(messageId, user, messageText); + + // Then + assertThat(mono).emitsExactly(message); + } + + @Test + @DisplayName("Assert, that Mono sends an error, if a message is added again with mutated text") + void testAddMutatedMessage() + { + // Given + String user = "foo"; + Long messageId = 1l; + ChatRoomService chatRoomService = mock(ChatRoomService.class); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + chatRoomService, + 8); + Message.MessageKey key = Message.MessageKey.of(user, messageId); + Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + LocalDateTime timestamp = LocalDateTime.now(now); + String messageText = "Bar"; + String mutatedText = "Boom!"; + Message message = new Message(key, 0l, timestamp, messageText); + when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); + + // When + Mono mono = chatRoom.addMessage(messageId, user, mutatedText); + + // Then + assertThat(mono).sendsError(); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java deleted file mode 100644 index 822ffe77..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java +++ /dev/null @@ -1,160 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Mono; - -import java.time.Clock; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.UUID; - -import static org.mockito.Mockito.*; -import static pl.rzrz.assertj.reactor.Assertions.assertThat; - - -public class ChatRoomTest -{ - @Test - @DisplayName("Assert, that Mono emits expected message, if it exists") - void testGetExistingMessage() - { - // Given - String user = "foo"; - Long messageId = 1l; - ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, - Clock.systemDefaultZone(), - chatRoomService, - 8); - Message.MessageKey key = Message.MessageKey.of(user, messageId); - LocalDateTime timestamp = LocalDateTime.now(); - Message message = new Message(key, 0l, timestamp, "Bar"); - when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); - - // When - Mono mono = chatRoom.getMessage(user, messageId); - - // Then - assertThat(mono).emitsExactly(message); - } - - @Test - @DisplayName("Assert, that Mono if empty, if message does not exists") - void testGetNonExistentMessage() - { - // Given - String user = "foo"; - Long messageId = 1l; - ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, - Clock.systemDefaultZone(), - chatRoomService, - 8); - when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty()); - - // When - Mono mono = chatRoom.getMessage(user, messageId); - - // Then - assertThat(mono).emitsCount(0); - } - - @Test - @DisplayName("Assert, that Mono emits expected message, if a new message is added") - void testAddNewMessage() - { - // Given - String user = "foo"; - Long messageId = 1l; - ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, - Clock.systemDefaultZone(), - chatRoomService, - 8); - Message.MessageKey key = Message.MessageKey.of(user, messageId); - Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); - LocalDateTime timestamp = LocalDateTime.now(now); - String messageText = "Bar"; - Message message = new Message(key, 0l, timestamp, messageText); - when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty()); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); - - // When - Mono mono = chatRoom.addMessage(messageId, user, messageText); - - // Then - assertThat(mono).emitsExactly(message); - } - - @Test - @DisplayName("Assert, that Mono emits expected message, if an unchanged message is added") - void testAddUnchangedMessage() - { - // Given - String user = "foo"; - Long messageId = 1l; - ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, - Clock.systemDefaultZone(), - chatRoomService, - 8); - Message.MessageKey key = Message.MessageKey.of(user, messageId); - Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); - LocalDateTime timestamp = LocalDateTime.now(now); - String messageText = "Bar"; - Message message = new Message(key, 0l, timestamp, messageText); - when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); - - // When - Mono mono = chatRoom.addMessage(messageId, user, messageText); - - // Then - assertThat(mono).emitsExactly(message); - } - - @Test - @DisplayName("Assert, that Mono sends an error, if a message is added again with mutated text") - void testAddMutatedMessage() - { - // Given - String user = "foo"; - Long messageId = 1l; - ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, - Clock.systemDefaultZone(), - chatRoomService, - 8); - Message.MessageKey key = Message.MessageKey.of(user, messageId); - Clock now = Clock.fixed(Instant.now(), ZoneId.systemDefault()); - LocalDateTime timestamp = LocalDateTime.now(now); - String messageText = "Bar"; - String mutatedText = "Boom!"; - Message message = new Message(key, 0l, timestamp, messageText); - when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); - - // When - Mono mono = chatRoom.addMessage(messageId, user, mutatedText); - - // Then - assertThat(mono).sendsError(); - } -}