X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Fmongodb%2FMongoDbStorageStrategy.java;h=3f9ff209c4bf6441b4c1e7e2058465ea71ef52c4;hb=443be2b6cb083c9042820ab05ab9fdd885b022aa;hp=972122e0e6efcee0b4cc021299e0fd72f2d95b84;hpb=9fbb1a24b8c62619f8e51c5575b70b66fcd99ff8;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 972122e0..3f9ff209 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -25,14 +25,14 @@ public class MongoDbStorageStrategy implements StorageStrategy { chatRoomInfoFlux .map(ChatRoomTo::from) - .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); + .flatMap(chatroomTo -> chatRoomRepository.save(chatroomTo)) + .subscribe(); } @Override public Flux readChatRoomInfo() { - return Flux - .fromIterable(chatRoomRepository.findAll()) + return chatRoomRepository.findAll() .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); @@ -41,24 +41,18 @@ public class MongoDbStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData( - UUID chatRoomId, - Flux messageFlux, - SuccessCallback successCallback, - FailureCallback failureCallback) + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) { messageFlux .map(message -> MessageTo.from(chatRoomId, message)) - .doOnComplete(() -> successCallback.accept(chatRoomId)) - .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)) - .subscribe(messageTo -> messageRepository.save(messageTo)); + .flatMap(messageTo -> messageRepository.save(messageTo)) + .subscribe(); } @Override public Flux readChatRoomData(UUID chatRoomId) { - return Flux - .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) + return messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()) .map(messageTo -> messageTo.toMessage()); } }