{
Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
- default void write(ChatHomeService chatHomeService)
+ default Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
{
- write(
+ return write(
chatHomeService,
this::logSuccessChatHomeService,
this::logFailureChatHomeService);
}
- default void write(
+ default Flux<ChatRoomInfo> write(
ChatHomeService chatHomeService,
ChatHomeServiceWrittenSuccessCallback successCallback,
ChatHomeServiceWrittenFailureCallback failureCallback)
{
- writeChatRoomInfo(
+ return writeChatRoomInfo(
chatHomeService
.getChatRoomInfo()
.doOnComplete(() -> successCallback.accept(chatHomeService))
.getChatRoomData(chatRoomInfo.getId())
.flatMapMany(chatRoomData -> chatRoomData.getMessages()),
this::logSuccessChatRoom,
- this::logFailureChatRoom)));
+ this::logFailureChatRoom
+ )
+ .subscribe()));
}
- void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+ Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
Flux<ChatRoomInfo> readChatRoomInfo();
- default void writeChatRoomData(
+ default Flux<Message> writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux,
ChatRoomWrittenSuccessCallback successCallback,
ChatRoomWrittenFailureCallback failureCallback)
{
- writeChatRoomData(
+ return writeChatRoomData(
chatRoomId,
messageFlux
.doOnComplete(() -> successCallback.accept(chatRoomId))
.doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
}
- void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+ Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
Flux<Message> readChatRoomData(UUID chatRoomId);
interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}