From: Kai Moritz Date: Sun, 8 Jan 2023 09:19:47 +0000 (+0100) Subject: refactore: Renamed `Chatroom` to `ChatRoom` -- Rename X-Git-Tag: wip~71 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a35ac0e02622cb5638380612064467c2309ebee5;p=demos%2Fkafka%2Fchat refactore: Renamed `Chatroom` to `ChatRoom` -- Rename --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 12f8d67f..e7fc483c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -1,10 +1,9 @@ package de.juplo.kafka.chat.backend.api; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.RequiredArgsConstructor; -import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; @@ -70,7 +69,7 @@ public class ChatBackendController } public Mono put( - Chatroom chatroom, + ChatRoom chatroom, String username, Long messageId, String text) @@ -100,7 +99,7 @@ public class ChatBackendController } private Mono get( - Chatroom chatroom, + ChatRoom chatroom, String username, Long messageId) { @@ -119,7 +118,7 @@ public class ChatBackendController .orElseThrow(() -> new UnknownChatroomException(chatroomId)); } - private Flux> listen(Chatroom chatroom) + private Flux> listen(ChatRoom chatroom) { return chatroom .listen() diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatroomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatroomTo.java index c4ad7848..773c9683 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatroomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatroomTo.java @@ -1,6 +1,6 @@ package de.juplo.kafka.chat.backend.api; -import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.ChatRoom; import lombok.Data; import java.util.UUID; @@ -12,7 +12,7 @@ public class ChatroomTo private String name; - public static ChatroomTo from(Chatroom chatroom) + public static ChatroomTo from(ChatRoom chatroom) { ChatroomTo info = new ChatroomTo(); info.id = chatroom.getId(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 773cd4cf..b31dedee 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -10,10 +10,10 @@ import java.util.stream.Stream; @Slf4j public class ChatHome { - private final Map chatrooms; + private final Map chatrooms; private final ChatHomeService service; - public ChatHome(ChatHomeService service, Flux chatroomFlux) + public ChatHome(ChatHomeService service, Flux chatroomFlux) { log.debug("Creating ChatHome with factory: {}", service); this.service = service; @@ -21,19 +21,19 @@ public class ChatHome chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); } - public Chatroom createChatroom(String name) + public ChatRoom createChatroom(String name) { - Chatroom chatroom = service.createChatroom(UUID.randomUUID(), name); + ChatRoom chatroom = service.createChatroom(UUID.randomUUID(), name); chatrooms.put(chatroom.getId(), chatroom); return chatroom; } - public Optional getChatroom(UUID id) + public Optional getChatroom(UUID id) { return Optional.ofNullable(chatrooms.get(id)); } - public Stream list() + public Stream list() { return chatrooms.values().stream(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 987839cc..5508f24a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -5,5 +5,5 @@ import java.util.UUID; public interface ChatHomeService { - Chatroom createChatroom(UUID id, String name); + ChatRoom createChatroom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java new file mode 100644 index 00000000..bed91c76 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -0,0 +1,87 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import java.time.LocalDateTime; +import java.util.*; + + +@Slf4j +public class ChatRoom +{ + @Getter + private final UUID id; + @Getter + private final String name; + private final ChatroomService chatroomService; + private final int bufferSize; + private Sinks.Many sink; + + public ChatRoom( + UUID id, + String name, + ChatroomService chatroomService, + int bufferSize) + { + this.id = id; + this.name = name; + this.chatroomService = chatroomService; + this.bufferSize = bufferSize; + this.sink = createSink(); + } + + + synchronized public Mono addMessage( + Long id, + LocalDateTime timestamp, + String user, + String text) + { + return chatroomService + .persistMessage(Message.MessageKey.of(user, id), timestamp, text) + .doOnNext(message -> + { + Sinks.EmitResult result = sink.tryEmitNext(message); + if (result.isFailure()) + { + log.warn("Emitting of message failed with {} for {}", result.name(), message); + } + }); + } + + + public Mono getMessage(String username, Long messageId) + { + Message.MessageKey key = Message.MessageKey.of(username, messageId); + return chatroomService.getMessage(key); + } + + synchronized public Flux listen() + { + return sink + .asFlux() + .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel! + } + + public Flux getMessages() + { + return getMessages(0, Long.MAX_VALUE); + } + + public Flux getMessages(long first, long last) + { + return chatroomService.getMessages(first, last); + } + + private Sinks.Many createSink() + { + return Sinks + .many() + .multicast() + .onBackpressureBuffer(bufferSize); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java deleted file mode 100644 index 2261e022..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java +++ /dev/null @@ -1,87 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; - -import java.time.LocalDateTime; -import java.util.*; - - -@Slf4j -public class Chatroom -{ - @Getter - private final UUID id; - @Getter - private final String name; - private final ChatroomService chatroomService; - private final int bufferSize; - private Sinks.Many sink; - - public Chatroom( - UUID id, - String name, - ChatroomService chatroomService, - int bufferSize) - { - this.id = id; - this.name = name; - this.chatroomService = chatroomService; - this.bufferSize = bufferSize; - this.sink = createSink(); - } - - - synchronized public Mono addMessage( - Long id, - LocalDateTime timestamp, - String user, - String text) - { - return chatroomService - .persistMessage(Message.MessageKey.of(user, id), timestamp, text) - .doOnNext(message -> - { - Sinks.EmitResult result = sink.tryEmitNext(message); - if (result.isFailure()) - { - log.warn("Emitting of message failed with {} for {}", result.name(), message); - } - }); - } - - - public Mono getMessage(String username, Long messageId) - { - Message.MessageKey key = Message.MessageKey.of(username, messageId); - return chatroomService.getMessage(key); - } - - synchronized public Flux listen() - { - return sink - .asFlux() - .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel! - } - - public Flux getMessages() - { - return getMessages(0, Long.MAX_VALUE); - } - - public Flux getMessages(long first, long last) - { - return chatroomService.getMessages(first, last); - } - - private Sinks.Many createSink() - { - return Sinks - .many() - .multicast() - .onBackpressureBuffer(bufferSize); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java index 5c7c4b94..006f9767 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java @@ -1,6 +1,6 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import lombok.RequiredArgsConstructor; @@ -15,18 +15,18 @@ public class InMemoryChatHomeService implements ChatHomeService()); - return new Chatroom(id, name, chatroomService, bufferSize); + return new ChatRoom(id, name, chatroomService, bufferSize); } - public Chatroom restoreChatroom( + public ChatRoom restoreChatroom( UUID id, String name, InMemoryChatroomService chatroomService) { - return new Chatroom(id, name, chatroomService, bufferSize); + return new ChatRoom(id, name, chatroomService, bufferSize); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java index d2092b2b..d66c544c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java @@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatroomTo; import de.juplo.kafka.chat.backend.api.MessageTo; -import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -32,7 +32,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy @Override - public void writeChatrooms(Flux chatroomFlux) + public void writeChatrooms(Flux chatroomFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -92,7 +92,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy } @Override - public Flux readChatrooms() + public Flux readChatrooms() { JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class); return Flux diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java index 38a9e98f..a337b61e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java @@ -1,15 +1,15 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.api.ChatroomTo; -import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import reactor.core.publisher.Flux; public interface StorageStrategy { - void writeChatrooms(Flux chatroomFlux); - Flux readChatrooms(); + void writeChatrooms(Flux chatroomFlux); + Flux readChatrooms(); void writeMessages(ChatroomTo chatroomTo, Flux messageFlux); Flux readMessages(ChatroomTo chatroomTo); }