import reactor.core.publisher.Flux;
import java.util.UUID;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
default void write(ChatHomeService chatHomeService)
{
+ Phaser writtenChatRooms = new Phaser(1);
+ AtomicInteger numErrors = new AtomicInteger();
+
writeChatRoomInfo(
chatHomeService
.getChatRoomInfo()
- .doOnNext(chatRoomInfo ->
- writeChatRoomData(
- chatRoomInfo.getId(),
- chatHomeService
- .getChatRoomData(chatRoomInfo.getId())
- .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
- this::logSuccess,
- this::logFailure)));
+ .doOnNext(chatRoomInfo -> writtenChatRooms.register())
+ .doOnNext(chatRoomInfo -> writeChatRoomData(
+ chatRoomInfo.getId(),
+ chatHomeService
+ .getChatRoomData(chatRoomInfo.getId())
+ .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+ (chatRoomId) ->
+ {
+ logSuccess(chatRoomId);
+ writtenChatRooms.arriveAndDeregister();
+ },
+ (chatRoomId, throwable) ->
+ {
+ logFailure(chatRoomId, throwable);
+ numErrors.incrementAndGet();
+ writtenChatRooms.arriveAndDeregister();
+ })));
+
+ writtenChatRooms.arriveAndAwaitAdvance();
+ if (numErrors.get() > 0)
+ {
+ throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
+ }
+
+ log.info("All chat-rooms were written successfully for {}", chatHomeService);
}
void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);