From 47abab30f5b1c267599c2cf863d9d501ad1950e7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 18 Feb 2024 19:49:09 +0100 Subject: [PATCH] WIP:mongodb map vs subscribe - subscribe rausgezogen --- .../de/juplo/kafka/chat/backend/ChatBackendApplication.java | 4 +++- .../juplo/kafka/chat/backend/api/ChatBackendController.java | 2 +- .../kafka/chat/backend/implementation/StorageStrategy.java | 6 +++--- .../juplo/kafka/chat/backend/AbstractStorageStrategyIT.java | 4 +++- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 1eaa88c7..76debbed 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -32,7 +32,9 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - storageStrategy.write(chatHomeService); + storageStrategy + .write(chatHomeService) + .subscribe(); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index f3efe791..acb84e64 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -137,6 +137,6 @@ public class ChatBackendController @PostMapping("/store") public void store() { - storageStrategy.write(chatHomeService); + storageStrategy.write(chatHomeService).subscribe(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index a62f4082..4b43f882 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -16,9 +16,9 @@ public interface StorageStrategy { Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName()); - default void write(ChatHomeService chatHomeService) + default Flux write(ChatHomeService chatHomeService) { - writeChatRoomInfo( + return writeChatRoomInfo( chatHomeService .getChatRoomInfo() .doOnNext(chatRoomInfo -> @@ -28,7 +28,7 @@ public interface StorageStrategy .getChatRoomData(chatRoomInfo.getId()) .flatMapMany(chatRoomData -> chatRoomData.getMessages()), this::logSuccess, - this::logFailure))); + this::logFailure).subscribe())); } Flux writeChatRoomInfo(Flux chatRoomInfoFlux); diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java index 5eaf5417..41e80ed7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java @@ -28,7 +28,9 @@ public abstract class AbstractStorageStrategyIT protected void stop() { - getStorageStrategy().write(chathome); + getStorageStrategy() + .write(chathome) + .subscribe(); } @Test -- 2.20.1