From: Kai Moritz Date: Mon, 9 Jan 2023 18:52:05 +0000 (+0100) Subject: feat: Moved persistence-logic from `ChatHome` into `ChatHomeService` X-Git-Tag: wip-sharding~53 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=212c5eb7912fd15768ddab961d104b27acc620a0;p=demos%2Fkafka%2Fchat feat: Moved persistence-logic from `ChatHome` into `ChatHomeService` - Aligned configuration and tests accordingly. - Also fixed some camel-case typos. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index f98a02a7..80a46c00 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -32,7 +32,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - storageStrategy.writeChatrooms(chatHome.list()); + storageStrategy.writeChatrooms(chatHome.getChatRooms()); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index 05fc2cb7..68ccc4fe 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -19,31 +19,34 @@ import java.time.Clock; public class ChatBackendConfiguration { @Bean - public ChatHome chatHome( - ChatHomeService chatHomeService, - StorageStrategy storageStrategy) + public ChatHome chatHome(ChatHomeService chatHomeService) { - return new ChatHome(chatHomeService, storageStrategy.readChatrooms()); + return new ChatHome(chatHomeService); } @Bean - public StorageStrategy storageStrategy( - ChatBackendProperties properties, - ObjectMapper mapper, - InMemoryChatHomeService chatHomeService) + InMemoryChatHomeService chatHomeService( + StorageStrategy storageStrategy, + Clock clock, + ChatBackendProperties properties) { - return new LocalJsonFilesStorageStrategy( - Paths.get(properties.getDatadir()), - mapper, - chatHomeService); + return new InMemoryChatHomeService( + storageStrategy.readChatrooms(), + clock, + properties.getChatroomBufferSize()); } @Bean - InMemoryChatHomeService chatHomeService( + public StorageStrategy storageStrategy( + ChatBackendProperties properties, Clock clock, - ChatBackendProperties properties) + ObjectMapper mapper) { - return new InMemoryChatHomeService(clock, properties.getChatroomBufferSize()); + return new LocalJsonFilesStorageStrategy( + Paths.get(properties.getDatadir()), + clock, + properties.getChatroomBufferSize(), + mapper); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 51ed6a20..e7a6f1a5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -29,14 +29,14 @@ public class ChatBackendController @GetMapping("list") public Flux list() { - return chatHome.list().map(chatroom -> ChatRoomTo.from(chatroom)); + return chatHome.getChatRooms().map(chatroom -> ChatRoomTo.from(chatroom)); } @GetMapping("list/{chatroomId}") public Flux list(@PathVariable UUID chatroomId) { return chatHome - .getChatroom(chatroomId) + .getChatRoom(chatroomId) .flatMapMany(chatroom -> chatroom .getMessages() .map(MessageTo::from)); @@ -45,7 +45,7 @@ public class ChatBackendController @GetMapping("get/{chatroomId}") public Mono get(@PathVariable UUID chatroomId) { - return chatHome.getChatroom(chatroomId).map(chatroom -> ChatRoomTo.from(chatroom)); + return chatHome.getChatRoom(chatroomId).map(chatroom -> ChatRoomTo.from(chatroom)); } @PutMapping("put/{chatroomId}/{username}/{messageId}") @@ -57,7 +57,7 @@ public class ChatBackendController { return chatHome - .getChatroom(chatroomId) + .getChatRoom(chatroomId) .flatMap(chatroom -> put(chatroom, username, messageId, text)); } @@ -84,7 +84,7 @@ public class ChatBackendController { return chatHome - .getChatroom(chatroomId) + .getChatRoom(chatroomId) .flatMap(chatroom -> get(chatroom, username, messageId)); } @@ -103,7 +103,7 @@ public class ChatBackendController public Flux> listen(@PathVariable UUID chatroomId) { return chatHome - .getChatroom(chatroomId) + .getChatRoom(chatroomId) .flatMapMany(chatroom -> listen(chatroom)); } @@ -124,6 +124,6 @@ public class ChatBackendController @PostMapping("/store") public void store() { - storageStrategy.writeChatrooms(chatHome.list()); + storageStrategy.writeChatrooms(chatHome.getChatRooms()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index bb4d89c5..1450d3e3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.domain; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -8,36 +9,25 @@ import java.util.*; @Slf4j +@RequiredArgsConstructor public class ChatHome { - private final Map chatrooms; private final ChatHomeService service; - public ChatHome(ChatHomeService service, Flux chatroomFlux) - { - log.debug("Creating ChatHome with factory: {}", service); - this.service = service; - this.chatrooms = new HashMap<>(); - chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); - } - public Mono createChatroom(String name) { - ChatRoom chatroom = service.createChatroom(UUID.randomUUID(), name); - chatrooms.put(chatroom.getId(), chatroom); - return Mono.just(chatroom); + return service.createChatRoom(UUID.randomUUID(), name); } - public Mono getChatroom(UUID id) + public Mono getChatRoom(UUID id) { - ChatRoom chatroom = chatrooms.get(id); - return chatroom == null - ? Mono.error(() -> new UnknownChatroomException(id)) - : Mono.just(chatroom); + return service + .getChatRoom(id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } - public Flux list() + public Flux getChatRooms() { - return Flux.fromStream(chatrooms.values().stream()); + return service.getChatRooms(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 69ed5f42..5743655b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -1,9 +1,14 @@ package de.juplo.kafka.chat.backend.domain; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import java.util.UUID; public interface ChatHomeService { - ChatRoom createChatroom(UUID id, String name); + Mono createChatRoom(UUID id, String name); + Mono getChatRoom(UUID id); + Flux getChatRooms(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java index df0ecba5..b2f94ec3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java @@ -2,33 +2,56 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.time.Clock; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.Map; import java.util.UUID; -@RequiredArgsConstructor +@Slf4j public class InMemoryChatHomeService implements ChatHomeService { + private final Map chatrooms; private final Clock clock; private final int bufferSize; + public InMemoryChatHomeService( + Flux chatroomFlux, + Clock clock, + int bufferSize) + { + log.debug("Creating InMemoryChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize); + this.chatrooms = new HashMap<>(); + chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); + this.clock = clock; + this.bufferSize = bufferSize; + } + @Override - public ChatRoom createChatroom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { InMemoryChatRoomService service = new InMemoryChatRoomService(new LinkedHashMap<>()); - return new ChatRoom(id, name, clock, service, bufferSize); + ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize); + chatrooms.put(chatRoom.getId(), chatRoom); + return Mono.just(chatRoom); + } + + @Override + public Mono getChatRoom(UUID id) + { + return Mono.justOrEmpty(chatrooms.get(id)); } - public ChatRoom restoreChatroom( - UUID id, - String name, - InMemoryChatRoomService service) + @Override + public Flux getChatRooms() { - return new ChatRoom(id, name, clock, service, bufferSize); + return Flux.fromStream(chatrooms.values().stream()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java index 49d400b9..f789d359 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java @@ -23,7 +23,7 @@ public class InMemoryChatRoomService implements ChatRoomService public InMemoryChatRoomService(Flux messageFlux) { - log.debug("Creating InMemoryChatroomService"); + log.debug("Creating InMemoryChatRoomService"); messages = new LinkedHashMap<>(); messageFlux.subscribe(message -> messages.put(message.getKey(), message)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java index 706fbe30..7b490bf6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java @@ -14,6 +14,7 @@ import reactor.core.publisher.Flux; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Clock; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -27,8 +28,9 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy private final Path storagePath; + private final Clock clock; + private final int bufferSize; private final ObjectMapper mapper; - private final InMemoryChatHomeService service; @Override @@ -102,10 +104,12 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy { InMemoryChatRoomService chatroomService = new InMemoryChatRoomService(readMessages(chatRoomTo)); - return service.restoreChatroom( + return new ChatRoom( chatRoomTo.getId(), chatRoomTo.getName(), - chatroomService); + clock, + chatroomService, + bufferSize); }); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 8947a03c..5e3c9283 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -34,7 +34,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatroom(any(UUID.class))) + when(chatHome.getChatRoom(any(UUID.class))) .thenReturn(Mono.error(() -> new UnknownChatroomException(chatroomId))); // When @@ -55,7 +55,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatroom(any(UUID.class))) + when(chatHome.getChatRoom(any(UUID.class))) .thenReturn(Mono.error(() -> new UnknownChatroomException(chatroomId))); // When @@ -77,7 +77,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHome.getChatroom(any(UUID.class))) + when(chatHome.getChatRoom(any(UUID.class))) .thenReturn(Mono.error(() -> new UnknownChatroomException(chatroomId))); // When @@ -104,7 +104,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHome.getChatroom(any(UUID.class))) + when(chatHome.getChatRoom(any(UUID.class))) .thenReturn(Mono.error(() -> new UnknownChatroomException(chatroomId))); // When @@ -128,7 +128,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatroom(any(UUID.class))) + when(chatHome.getChatRoom(any(UUID.class))) .thenReturn(Mono.error(() -> new UnknownChatroomException(chatroomId))); // When @@ -162,7 +162,7 @@ public class ChatBackendControllerTest String username = "foo"; Long messageId = 66l; ChatRoom chatRoom = mock(ChatRoom.class); - when(chatHome.getChatroom(any(UUID.class))) + when(chatHome.getChatRoom(any(UUID.class))) .thenReturn(Mono.just(chatRoom)); Message.MessageKey key = Message.MessageKey.of("foo", 1l); LocalDateTime timestamp = LocalDateTime.now(); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java index 655c1b8c..aba4d4a8 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java @@ -2,13 +2,14 @@ package de.juplo.kafka.chat.backend.domain; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Clock; import java.util.UUID; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static pl.rzrz.assertj.reactor.Assertions.assertThat; @@ -20,13 +21,17 @@ public class ChatHomeTest { // Given ChatHomeService chatHomeService = mock(ChatHomeService.class); - ChatRoomService chatRoomService = mock(ChatRoomService.class); - UUID chatroomId = UUID.randomUUID(); - ChatRoom chatRoom = new ChatRoom(chatroomId, "Foo", Clock.systemDefaultZone(), chatRoomService, 8); - ChatHome chatHome = new ChatHome(chatHomeService, Flux.just(chatRoom)); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + Clock.systemDefaultZone(), + mock(ChatRoomService.class), + 8); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom)); + ChatHome chatHome = new ChatHome(chatHomeService); // When - Mono mono = chatHome.getChatroom(chatroomId); + Mono mono = chatHome.getChatRoom(chatRoom.getId()); // Then assertThat(mono).emitsExactly(chatRoom); @@ -38,10 +43,11 @@ public class ChatHomeTest { // Given ChatHomeService chatHomeService = mock(ChatHomeService.class); - ChatHome chatHome = new ChatHome(chatHomeService, Flux.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); + ChatHome chatHome = new ChatHome(chatHomeService); // When - Mono mono = chatHome.getChatroom(UUID.randomUUID()); + Mono mono = chatHome.getChatRoom(UUID.randomUUID()); // Then assertThat(mono).sendsError(); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategyIT.java index a4ff04fe..fd413c3b 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategyIT.java @@ -25,24 +25,24 @@ public class LocalJsonFilesStorageStrategyIT { final static Path path = Paths.get("target","local-json-files"); - InMemoryChatHomeService service; + InMemoryChatHomeService chatHomeService; StorageStrategy storageStrategy; ChatHome chathome; void start() { Clock clock = Clock.systemDefaultZone(); - service = new InMemoryChatHomeService(clock, 8); ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - storageStrategy = new LocalJsonFilesStorageStrategy(path, mapper, service); - chathome = new ChatHome(service, storageStrategy.readChatrooms()); + storageStrategy = new LocalJsonFilesStorageStrategy(path, clock, 8, mapper); + chatHomeService = new InMemoryChatHomeService(storageStrategy.readChatrooms(), clock, 8); + chathome = new ChatHome(chatHomeService); } void stop() { - storageStrategy.writeChatrooms(chathome.list()); + storageStrategy.writeChatrooms(chathome.getChatRooms()); } @Test @@ -50,7 +50,7 @@ public class LocalJsonFilesStorageStrategyIT { start(); - assertThat(chathome.list().toStream()).hasSize(0); + assertThat(chathome.getChatRooms().toStream()).hasSize(0); ChatRoom chatroom = chathome.createChatroom("FOO").block(); Message m1 = chatroom.addMessage(1l,"Peter", "Hallo, ich heiße Peter!").block(); @@ -58,19 +58,19 @@ public class LocalJsonFilesStorageStrategyIT Message m3 = chatroom.addMessage(2l, "Peter", "Willst du mit mir gehen?").block(); Message m4 = chatroom.addMessage(1l, "Klaus", "Ja? Nein? Vielleicht??").block(); - assertThat(chathome.list().toStream()).containsExactlyElementsOf(List.of(chatroom)); - assertThat(chathome.getChatroom(chatroom.getId())).emitsExactly(chatroom); + assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom)); + assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom); assertThat(chathome - .getChatroom(chatroom.getId()) + .getChatRoom(chatroom.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); stop(); start(); - assertThat(chathome.list().toStream()).containsExactlyElementsOf(List.of(chatroom)); - assertThat(chathome.getChatroom(chatroom.getId())).emitsExactly(chatroom); + assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom)); + assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom); assertThat(chathome - .getChatroom(chatroom.getId()) + .getChatRoom(chatroom.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); }