From: Kai Moritz Date: Tue, 10 Jan 2023 20:29:58 +0000 (+0100) Subject: WIP:fix-async X-Git-Tag: TEST~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=0dbfbd9f64b9d9415a65a41457c757c928307994;p=demos%2Fkafka%2Fchat WIP:fix-async --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 73f5e8a3..8cbbb556 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.time.Clock; @@ -11,6 +12,7 @@ import java.util.UUID; @RequiredArgsConstructor +@Slf4j public class MongoDbStorageStrategy implements StorageStrategy { private final ChatHomeRepository repository; @@ -25,7 +27,9 @@ public class MongoDbStorageStrategy implements StorageStrategy chatroomFlux .log() .map(ChatRoomTo::from) - .subscribe(chatroom -> repository.save(chatroom)); + .flatMap(chatroom -> repository.save(chatroom)) + .doOnNext(chatRoomTo -> log.debug("Written: {}", chatRoomTo)) + .blockLast(); } @Override