]> juplo.de Git - demos/kafka/chat/commitdiff
WIP:mongodb map vs subscribe - subscribe rausgezogen
authorKai Moritz <kai@juplo.de>
Sun, 18 Feb 2024 18:49:09 +0000 (19:49 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 18 Feb 2024 18:49:09 +0000 (19:49 +0100)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java

index 1eaa88c72fdf8d932d9d29a9750854cc8a7739c3..76debbedad052df2f4d4f6392b0ac6b191950cae 100644 (file)
@@ -32,7 +32,9 @@ public class ChatBackendApplication implements WebFluxConfigurer
        @PreDestroy
        public void onExit()
        {
-               storageStrategy.write(chatHomeService);
+               storageStrategy
+                               .write(chatHomeService)
+                               .subscribe();
        }
 
        public static void main(String[] args)
index f3efe791d098f850ee90ef963ec8530094916ea9..acb84e64e4f0166c1dcda1d3de76784198cdeeec 100644 (file)
@@ -137,6 +137,6 @@ public class ChatBackendController
   @PostMapping("/store")
   public void store()
   {
-    storageStrategy.write(chatHomeService);
+    storageStrategy.write(chatHomeService).subscribe();
   }
 }
index a62f4082e852e8886fc33ff14383e36d204e7d4a..4b43f882ffcda51dc4f06d186e82ad539af8e35e 100644 (file)
@@ -16,9 +16,9 @@ public interface StorageStrategy
 {
   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
 
-  default void write(ChatHomeService chatHomeService)
+  default Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
   {
-    writeChatRoomInfo(
+    return writeChatRoomInfo(
         chatHomeService
             .getChatRoomInfo()
             .doOnNext(chatRoomInfo ->
@@ -28,7 +28,7 @@ public interface StorageStrategy
                         .getChatRoomData(chatRoomInfo.getId())
                         .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
                     this::logSuccess,
-                    this::logFailure)));
+                    this::logFailure).subscribe()));
   }
 
   Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
index 5eaf5417da26cf39e5d7c45891f34c444c8c5827..41e80ed722926bd7ded510f0eb660b75f84ba411 100644 (file)
@@ -28,7 +28,9 @@ public abstract class AbstractStorageStrategyIT
 
   protected void stop()
   {
-    getStorageStrategy().write(chathome);
+    getStorageStrategy()
+        .write(chathome)
+        .subscribe();
   }
 
   @Test