From: Kai Moritz Date: Sun, 18 Feb 2024 16:33:30 +0000 (+0100) Subject: WIP:callbacks X-Git-Tag: rebase--2024-02-18--18-12~7 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9531b3e0e1f90a0f85ec1dbb00ae2d206086cccc;p=demos%2Fkafka%2Fchat WIP:callbacks --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index 9fb115e3..9dd76250 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -26,25 +26,26 @@ public interface StorageStrategy chatRoomInfo.getId(), chatHomeService .getChatRoomData(chatRoomInfo.getId()) - .flatMapMany(chatRoomData -> chatRoomData.getMessages())))); - } - - default void writeChatRoomData(UUID chatRoomId, Flux messageFlux) - { - writeChatRoomData( - chatRoomId, - messageFlux, - (id) -> logSuccess(id), - (id, throwable) -> logFailure(id, throwable)); + .flatMapMany(chatRoomData -> chatRoomData.getMessages()), + this::logSuccess, + this::logFailure))); } void writeChatRoomInfo(Flux chatRoomInfoFlux); Flux readChatRoomInfo(); - void writeChatRoomData( + default void writeChatRoomData( UUID chatRoomId, Flux messageFlux, - SuccessCallback callback, - FailureCallback failureCallback); + SuccessCallback successCallback, + FailureCallback failureCallback) + { + writeChatRoomData( + chatRoomId, + messageFlux + .doOnComplete(() -> successCallback.accept(chatRoomId)) + .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable))); + } + void writeChatRoomData(UUID chatRoomId, Flux messageFlux); Flux readChatRoomData(UUID chatRoomId); interface SuccessCallback extends Consumer {}