refactor: One stream -> using `flatMap` instead of an inner `subscribe` rebase--2024-02-20--09-03
authorKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 07:50:56 +0000 (08:50 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 07:50:56 +0000 (08:50 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java

index 6b290fd..f9ce2b1 100644 (file)
@@ -6,6 +6,7 @@ import de.juplo.kafka.chat.backend.domain.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 import java.util.UUID;
 
@@ -19,16 +20,16 @@ public interface StorageStrategy
     return writeChatRoomInfo(
         chatHomeService
             .getChatRoomInfo()
-            .doOnNext(chatRoomInfo ->
+            .flatMap(chatRoomInfo ->
                 writeChatRoomData(
                     chatRoomInfo.getId(),
                     chatHomeService
                         .getChatRoomData(chatRoomInfo.getId())
                         .flatMapMany(chatRoomData -> chatRoomData.getMessages())
                         )
-                    .doOnComplete(() -> log.info("Stored {}", chatRoomInfo))
-                    .doOnError(throwable -> log.error("Could not store {}: {}", chatRoomInfo, throwable))
-                    .subscribe()))
+                    .then(Mono.just(chatRoomInfo))
+                    .doOnSuccess(empty -> log.info("Stored {}", chatRoomInfo))
+                    .doOnError(throwable -> log.error("Could not store {}: {}", chatRoomInfo, throwable))))
         .doOnComplete(() -> log.info("Stored {}", chatHomeService))
         .doOnError(throwable -> log.error("Could not store {}: {}", chatHomeService, throwable));
   }