- .doOnNext(chatRoomInfo ->
- writeChatRoomData(
- chatRoomInfo.getId(),
- chatHomeService
- .getChatRoomData(chatRoomInfo.getId())
- .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
+ .doOnNext(chatRoomInfo -> writtenChatRooms.register())
+ .doOnNext(chatRoomInfo -> writeChatRoomData(
+ chatRoomInfo.getId(),
+ chatHomeService
+ .getChatRoomData(chatRoomInfo.getId())
+ .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+ (chatRoomId) ->
+ {
+ logSuccess(chatRoomId);
+ writtenChatRooms.arriveAndDeregister();
+ },
+ (chatRoomId, throwable) ->
+ {
+ logFailure(chatRoomId, throwable);
+ numErrors.incrementAndGet();
+ writtenChatRooms.arriveAndDeregister();
+ })));
+
+ writtenChatRooms.arriveAndAwaitAdvance();
+ if (numErrors.get() > 0)
+ {
+ throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
+ }
+
+ log.info("All chat-rooms were written successfully for {}", chatHomeService);