From 0eea18f0e90e71b5b2a5d2461ab9ea528ad6e0a1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 19 Feb 2024 15:01:58 +0100 Subject: [PATCH] WIP:mongodb map vs subscribe - subscribe rausgezogen --- .../chat/backend/ChatBackendApplication.java | 4 +++- .../backend/api/ChatBackendController.java | 2 +- .../implementation/StorageStrategy.java | 20 ++++++++++--------- .../storage/files/FilesStorageStrategy.java | 14 ++++++------- .../backend/storage/mongodb/ChatRoomTo.java | 9 +++++++++ .../mongodb/MongoDbStorageStrategy.java | 12 +++++------ .../nostorage/NoStorageStorageStrategy.java | 19 ++++++++++++------ .../backend/AbstractStorageStrategyIT.java | 4 +++- 8 files changed, 52 insertions(+), 32 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 1eaa88c7..76debbed 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -32,7 +32,9 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - storageStrategy.write(chatHomeService); + storageStrategy + .write(chatHomeService) + .subscribe(); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index f3efe791..acb84e64 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -137,6 +137,6 @@ public class ChatBackendController @PostMapping("/store") public void store() { - storageStrategy.write(chatHomeService); + storageStrategy.write(chatHomeService).subscribe(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index 499a7387..6fa9c163 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -16,20 +16,20 @@ public interface StorageStrategy { Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName()); - default void write(ChatHomeService chatHomeService) + default Flux write(ChatHomeService chatHomeService) { - write( + return write( chatHomeService, this::logSuccessChatHomeService, this::logFailureChatHomeService); } - default void write( + default Flux write( ChatHomeService chatHomeService, ChatHomeServiceWrittenSuccessCallback successCallback, ChatHomeServiceWrittenFailureCallback failureCallback) { - writeChatRoomInfo( + return writeChatRoomInfo( chatHomeService .getChatRoomInfo() .doOnComplete(() -> successCallback.accept(chatHomeService)) @@ -40,24 +40,26 @@ public interface StorageStrategy .getChatRoomData(chatRoomInfo.getId()) .flatMapMany(chatRoomData -> chatRoomData.getMessages()), this::logSuccessChatRoom, - this::logFailureChatRoom))); + this::logFailureChatRoom + ) + .subscribe())); } - void writeChatRoomInfo(Flux chatRoomInfoFlux); + Flux writeChatRoomInfo(Flux chatRoomInfoFlux); Flux readChatRoomInfo(); - default void writeChatRoomData( + default Flux writeChatRoomData( UUID chatRoomId, Flux messageFlux, ChatRoomWrittenSuccessCallback successCallback, ChatRoomWrittenFailureCallback failureCallback) { - writeChatRoomData( + return writeChatRoomData( chatRoomId, messageFlux .doOnComplete(() -> successCallback.accept(chatRoomId)) .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable))); } - void writeChatRoomData(UUID chatRoomId, Flux messageFlux); + Flux writeChatRoomData(UUID chatRoomId, Flux messageFlux); Flux readChatRoomData(UUID chatRoomId); interface ChatHomeServiceWrittenSuccessCallback extends Consumer {} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index 7e04a964..cdb4f0d7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -35,7 +35,7 @@ public class FilesStorageStrategy implements StorageStrategy @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -48,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatRoomInfoFlux + return chatRoomInfoFlux .log() .doFirst(() -> { @@ -86,8 +86,7 @@ public class FilesStorageStrategy implements StorageStrategy { throw new RuntimeException(e); } - }) - .subscribe(); + }); } catch (IOException e) { @@ -121,7 +120,7 @@ public class FilesStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData( + public Flux writeChatRoomData( UUID chatRoomId, Flux messageFlux) { @@ -136,7 +135,7 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - messageFlux + return messageFlux .log() .doFirst(() -> { @@ -174,8 +173,7 @@ public class FilesStorageStrategy implements StorageStrategy { throw new RuntimeException(e); } - }) - .subscribe(); + }); } catch (IOException e) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java index 853ee1cf..47596a2e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java @@ -5,6 +5,8 @@ import lombok.*; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; +import java.util.UUID; + @AllArgsConstructor @NoArgsConstructor @@ -19,6 +21,13 @@ public class ChatRoomTo private String id; private String name; + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo( + UUID.fromString(id), + name); + } + public static ChatRoomTo from(ChatRoomInfo chatRoomInfo) { return new ChatRoomTo( diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 780d64be..1428119e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -21,12 +21,12 @@ public class MongoDbStorageStrategy implements StorageStrategy @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { - chatRoomInfoFlux + return chatRoomInfoFlux .map(ChatRoomTo::from) .map(chatroomTo -> chatRoomRepository.save(chatroomTo)) - .subscribe(); + .map(ChatRoomTo::toChatRoomInfo); } @Override @@ -42,12 +42,12 @@ public class MongoDbStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) + public Flux writeChatRoomData(UUID chatRoomId, Flux messageFlux) { - messageFlux + return messageFlux .map(message -> MessageTo.from(chatRoomId, message)) .map(messageTo -> messageRepository.save(messageTo)) - .subscribe(); + .map(MessageTo::toMessage); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java index 6ca08e22..59027424 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java @@ -13,14 +13,18 @@ import java.util.UUID; @Slf4j public class NoStorageStorageStrategy implements StorageStrategy { - @Override - public void write(ChatHomeService chatHomeService) + public Flux write(ChatHomeService chatHomeService) { - log.info("Storage is disabled: Not storing {}", chatHomeService); + return Flux + .empty() + .doOnComplete(() -> log.info("Storage is disabled: Not storing {}", chatHomeService)); + } - @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) {} + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) + { + return chatRoomInfoFlux; + } @Override public Flux readChatRoomInfo() @@ -29,7 +33,10 @@ public class NoStorageStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) {} + public Flux writeChatRoomData(UUID chatRoomId, Flux messageFlux) + { + return messageFlux; + } @Override public Flux readChatRoomData(UUID chatRoomId) diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java index 5eaf5417..41e80ed7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java @@ -28,7 +28,9 @@ public abstract class AbstractStorageStrategyIT protected void stop() { - getStorageStrategy().write(chathome); + getStorageStrategy() + .write(chathome) + .subscribe(); } @Test -- 2.20.1