From: Kai Moritz Date: Sun, 18 Feb 2024 18:49:09 +0000 (+0100) Subject: WIP:mongodb map vs subscribe - subscribe rausgezogen X-Git-Tag: rebase--2024-02-19--08-59~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=47abab30f5b1c267599c2cf863d9d501ad1950e7;p=demos%2Fkafka%2Fchat WIP:mongodb map vs subscribe - subscribe rausgezogen --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 1eaa88c7..76debbed 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -32,7 +32,9 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - storageStrategy.write(chatHomeService); + storageStrategy + .write(chatHomeService) + .subscribe(); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index f3efe791..acb84e64 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -137,6 +137,6 @@ public class ChatBackendController @PostMapping("/store") public void store() { - storageStrategy.write(chatHomeService); + storageStrategy.write(chatHomeService).subscribe(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index a62f4082..4b43f882 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -16,9 +16,9 @@ public interface StorageStrategy { Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName()); - default void write(ChatHomeService chatHomeService) + default Flux write(ChatHomeService chatHomeService) { - writeChatRoomInfo( + return writeChatRoomInfo( chatHomeService .getChatRoomInfo() .doOnNext(chatRoomInfo -> @@ -28,7 +28,7 @@ public interface StorageStrategy .getChatRoomData(chatRoomInfo.getId()) .flatMapMany(chatRoomData -> chatRoomData.getMessages()), this::logSuccess, - this::logFailure))); + this::logFailure).subscribe())); } Flux writeChatRoomInfo(Flux chatRoomInfoFlux); diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java index 5eaf5417..41e80ed7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java @@ -28,7 +28,9 @@ public abstract class AbstractStorageStrategyIT protected void stop() { - getStorageStrategy().write(chathome); + getStorageStrategy() + .write(chathome) + .subscribe(); } @Test