WIP:fix-async
authorKai Moritz <kai@juplo.de>
Tue, 10 Jan 2023 20:29:58 +0000 (21:29 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 10 Jan 2023 20:29:58 +0000 (21:29 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java

index 73f5e8a..8cbbb55 100644 (file)
@@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 
 import java.time.Clock;
@@ -11,6 +12,7 @@ import java.util.UUID;
 
 
 @RequiredArgsConstructor
+@Slf4j
 public class MongoDbStorageStrategy implements StorageStrategy
 {
   private final ChatHomeRepository repository;
@@ -25,7 +27,9 @@ public class MongoDbStorageStrategy implements StorageStrategy
     chatroomFlux
         .log()
         .map(ChatRoomTo::from)
-        .subscribe(chatroom -> repository.save(chatroom));
+        .flatMap(chatroom -> repository.save(chatroom))
+        .doOnNext(chatRoomTo -> log.debug("Written: {}", chatRoomTo))
+        .blockLast();
   }
 
   @Override