- .doOnNext(chatRoomInfo ->
- writeChatRoomData(
- chatRoomInfo.getId(),
- chatHomeService
- .getChatRoomData(chatRoomInfo.getId())
- .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
- }
+ .doOnNext(chatRoomInfo -> writtenChatRooms.register())
+ .doOnNext(chatRoomInfo -> writeChatRoomData(
+ chatRoomInfo.getId(),
+ chatHomeService
+ .getChatRoomData(chatRoomInfo.getId())
+ .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+ (chatRoomId) ->
+ {
+ logSuccess(chatRoomId);
+ writtenChatRooms.arriveAndDeregister();
+ },
+ (chatRoomId, throwable) ->
+ {
+ logFailure(chatRoomId, throwable);
+ numErrors.incrementAndGet();
+ writtenChatRooms.arriveAndDeregister();
+ })));