import reactor.core.publisher.Flux;
import java.util.UUID;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
default void write(ChatHomeService chatHomeService)
{
+ Phaser writtenChatRooms = new Phaser(1);
+ AtomicInteger numErrors = new AtomicInteger();
+
writeChatRoomInfo(
chatHomeService
.getChatRoomInfo()
- .doOnNext(chatRoomInfo ->
- writeChatRoomData(
- chatRoomInfo.getId(),
- chatHomeService
- .getChatRoomData(chatRoomInfo.getId())
- .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
- }
+ .doOnNext(chatRoomInfo -> writtenChatRooms.register())
+ .doOnNext(chatRoomInfo -> writeChatRoomData(
+ chatRoomInfo.getId(),
+ chatHomeService
+ .getChatRoomData(chatRoomInfo.getId())
+ .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+ (chatRoomId) ->
+ {
+ logSuccess(chatRoomId);
+ writtenChatRooms.arriveAndDeregister();
+ },
+ (chatRoomId, throwable) ->
+ {
+ logFailure(chatRoomId, throwable);
+ numErrors.incrementAndGet();
+ writtenChatRooms.arriveAndDeregister();
+ })));
- default void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
- {
- writeChatRoomData(
- chatRoomId,
- messageFlux,
- (id) -> log.info("Successfully stored chat-room {}", id),
- (id, throwable) -> log.error("Could not store chat-room {}: {}", id, throwable));
+ writtenChatRooms.arriveAndAwaitAdvance();
+ if (numErrors.get() > 0)
+ {
+ throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
+ }
+
+ log.info("All chat-rooms were written successfully for {}", chatHomeService);
}
void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
Flux<ChatRoomInfo> readChatRoomInfo();
- void writeChatRoomData(
+ default void writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux,
- SuccessCallback callback,
- FailureCallback failureCallback);
+ SuccessCallback successCallback,
+ FailureCallback failureCallback)
+ {
+ writeChatRoomData(
+ chatRoomId,
+ messageFlux
+ .doOnComplete(() -> successCallback.accept(chatRoomId))
+ .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
+ }
+ void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
Flux<Message> readChatRoomData(UUID chatRoomId);
interface SuccessCallback extends Consumer<UUID> {}
interface FailureCallback extends BiConsumer<UUID, Throwable> {}
+
+ default void logSuccess(UUID chatRoomId)
+ {
+ log.info("Successfully stored chat-room {}", chatRoomId);
+ }
+
+ default void logFailure(UUID chatRoomId, Throwable throwable)
+ {
+ log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
+ }
}