From: Kai Moritz Date: Sun, 18 Feb 2024 16:34:40 +0000 (+0100) Subject: WIP:callbacks X-Git-Tag: rebase--2024-02-18--18-12~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=fb6aa205877a91cc24af68a13f9a2d50555b68ea;p=demos%2Fkafka%2Fchat WIP:callbacks --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index 1de0b445..cb7dd31e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -121,9 +121,7 @@ public class FilesStorageStrategy implements StorageStrategy @Override public void writeChatRoomData( UUID chatRoomId, - Flux messageFlux, - SuccessCallback successCallback, - FailureCallback failureCallback) + Flux messageFlux) { Path path = chatroomPath(chatRoomId); log.info("Writing messages for {} to {}", chatRoomId, path); @@ -174,12 +172,10 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }); - - successCallback.accept(chatRoomId); } catch (IOException e) { - failureCallback.accept(chatRoomId, e); + throw new RuntimeException(e); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 972122e0..b1bead9b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -41,16 +41,10 @@ public class MongoDbStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData( - UUID chatRoomId, - Flux messageFlux, - SuccessCallback successCallback, - FailureCallback failureCallback) + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) { messageFlux .map(message -> MessageTo.from(chatRoomId, message)) - .doOnComplete(() -> successCallback.accept(chatRoomId)) - .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)) .subscribe(messageTo -> messageRepository.save(messageTo)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java index 376679af..1b20aa37 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java @@ -41,15 +41,7 @@ public class NoStorageStorageConfiguration } @Override - public void writeChatRoomData( - UUID chatRoomId, - Flux messageFlux, - SuccessCallback successCallback, - FailureCallback failureCallback - ) - { - successCallback.accept(chatRoomId); - } + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) {} @Override public Flux readChatRoomData(UUID chatRoomId)