From 9531b3e0e1f90a0f85ec1dbb00ae2d206086cccc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 18 Feb 2024 17:33:30 +0100 Subject: [PATCH] WIP:callbacks --- .../implementation/StorageStrategy.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index 9fb115e3..9dd76250 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -26,25 +26,26 @@ public interface StorageStrategy chatRoomInfo.getId(), chatHomeService .getChatRoomData(chatRoomInfo.getId()) - .flatMapMany(chatRoomData -> chatRoomData.getMessages())))); - } - - default void writeChatRoomData(UUID chatRoomId, Flux messageFlux) - { - writeChatRoomData( - chatRoomId, - messageFlux, - (id) -> logSuccess(id), - (id, throwable) -> logFailure(id, throwable)); + .flatMapMany(chatRoomData -> chatRoomData.getMessages()), + this::logSuccess, + this::logFailure))); } void writeChatRoomInfo(Flux chatRoomInfoFlux); Flux readChatRoomInfo(); - void writeChatRoomData( + default void writeChatRoomData( UUID chatRoomId, Flux messageFlux, - SuccessCallback callback, - FailureCallback failureCallback); + SuccessCallback successCallback, + FailureCallback failureCallback) + { + writeChatRoomData( + chatRoomId, + messageFlux + .doOnComplete(() -> successCallback.accept(chatRoomId)) + .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable))); + } + void writeChatRoomData(UUID chatRoomId, Flux messageFlux); Flux readChatRoomData(UUID chatRoomId); interface SuccessCallback extends Consumer {} -- 2.20.1