X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2FStorageStrategy.java;h=87208dce32132b4815368eddf68c1c864577f85c;hb=13f86063f851fc2c4ad6de56c8edb78bff9d0592;hp=6fa9c163b608b79d692e36d20d5ef8b931104d44;hpb=1ce142cb9566a9ab5eacb3d1da7f414722e994e9;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index 6fa9c163..87208dce 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -6,85 +6,35 @@ import de.juplo.kafka.chat.backend.domain.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.UUID; -import java.util.function.BiConsumer; -import java.util.function.Consumer; public interface StorageStrategy { Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName()); - default Flux write(ChatHomeService chatHomeService) + default Mono write(ChatHomeService chatHomeService) { - return write( - chatHomeService, - this::logSuccessChatHomeService, - this::logFailureChatHomeService); - } - - default Flux write( - ChatHomeService chatHomeService, - ChatHomeServiceWrittenSuccessCallback successCallback, - ChatHomeServiceWrittenFailureCallback failureCallback) - { - return writeChatRoomInfo( - chatHomeService - .getChatRoomInfo() - .doOnComplete(() -> successCallback.accept(chatHomeService)) - .doOnError(throwable -> failureCallback.accept(chatHomeService, throwable)) - .doOnNext(chatRoomInfo -> writeChatRoomData( - chatRoomInfo.getId(), - chatHomeService - .getChatRoomData(chatRoomInfo.getId()) - .flatMapMany(chatRoomData -> chatRoomData.getMessages()), - this::logSuccessChatRoom, - this::logFailureChatRoom - ) - .subscribe())); + return writeChatRoomInfo(chatHomeService.getChatRoomInfo()) + .flatMap(chatRoomInfo -> writeChatRoomData( + chatRoomInfo.getId(), + chatHomeService + .getChatRoomData(chatRoomInfo.getId()) + .flatMapMany(chatRoomData -> chatRoomData.getMessages()) + ) + .count() + .doOnSuccess(count -> log.info("Stored {} messages for {}", count, chatRoomInfo)) + .doOnError(throwable -> log.error("Could not store {}: {}", chatRoomInfo, throwable))) + .count() + .doOnSuccess(count -> log.info("Stored {} chat-rooms for {}", count, chatHomeService)) + .doOnError(throwable -> log.error("Could not store {}: {}", chatHomeService, throwable)) + .then(); } Flux writeChatRoomInfo(Flux chatRoomInfoFlux); Flux readChatRoomInfo(); - default Flux writeChatRoomData( - UUID chatRoomId, - Flux messageFlux, - ChatRoomWrittenSuccessCallback successCallback, - ChatRoomWrittenFailureCallback failureCallback) - { - return writeChatRoomData( - chatRoomId, - messageFlux - .doOnComplete(() -> successCallback.accept(chatRoomId)) - .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable))); - } Flux writeChatRoomData(UUID chatRoomId, Flux messageFlux); Flux readChatRoomData(UUID chatRoomId); - - interface ChatHomeServiceWrittenSuccessCallback extends Consumer {} - interface ChatHomeServiceWrittenFailureCallback extends BiConsumer {} - - default void logSuccessChatHomeService(ChatHomeService chatHomeService) - { - log.info("Successfully stored {}", chatHomeService); - } - - default void logFailureChatHomeService(ChatHomeService chatHomeService, Throwable throwable) - { - log.error("Could not store {}: {}", chatHomeService, throwable); - } - - interface ChatRoomWrittenSuccessCallback extends Consumer {} - interface ChatRoomWrittenFailureCallback extends BiConsumer {} - - default void logSuccessChatRoom(UUID chatRoomId) - { - log.info("Successfully stored chat-room {}", chatRoomId); - } - - default void logFailureChatRoom(UUID chatRoomId, Throwable throwable) - { - log.error("Could not store chat-room {}: {}", chatRoomId, throwable); - } }