From 9fbb1a24b8c62619f8e51c5575b70b66fcd99ff8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 3 Feb 2024 22:44:02 +0100 Subject: [PATCH] refactor: Added success- and failure-callbacks to `StorageStrategy` --- .../implementation/StorageStrategy.java | 24 ++++++++++++++++++- .../storage/files/FilesStorageStrategy.java | 8 +++++-- .../mongodb/MongoDbStorageStrategy.java | 8 ++++++- .../NoStorageStorageConfiguration.java | 10 +++++++- 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index 24fd15f0..ba8bc23a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -3,13 +3,19 @@ package de.juplo.kafka.chat.backend.implementation; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; public interface StorageStrategy { + Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName()); + default void write(ChatHomeService chatHomeService) { writeChatRoomInfo( @@ -23,8 +29,24 @@ public interface StorageStrategy .flatMapMany(chatRoomData -> chatRoomData.getMessages())))); } + default void writeChatRoomData(UUID chatRoomId, Flux messageFlux) + { + writeChatRoomData( + chatRoomId, + messageFlux, + (id) -> log.info("Successfully stored chat-room {}", id), + (id, throwable) -> log.error("Could not store chat-room {}: {}", id, throwable)); + } + void writeChatRoomInfo(Flux chatRoomInfoFlux); Flux readChatRoomInfo(); - void writeChatRoomData(UUID chatRoomId, Flux messageFlux); + void writeChatRoomData( + UUID chatRoomId, + Flux messageFlux, + SuccessCallback callback, + FailureCallback failureCallback); Flux readChatRoomData(UUID chatRoomId); + + interface SuccessCallback extends Consumer {} + interface FailureCallback extends BiConsumer {} } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index cb7dd31e..1de0b445 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -121,7 +121,9 @@ public class FilesStorageStrategy implements StorageStrategy @Override public void writeChatRoomData( UUID chatRoomId, - Flux messageFlux) + Flux messageFlux, + SuccessCallback successCallback, + FailureCallback failureCallback) { Path path = chatroomPath(chatRoomId); log.info("Writing messages for {} to {}", chatRoomId, path); @@ -172,10 +174,12 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }); + + successCallback.accept(chatRoomId); } catch (IOException e) { - throw new RuntimeException(e); + failureCallback.accept(chatRoomId, e); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index b1bead9b..972122e0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -41,10 +41,16 @@ public class MongoDbStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) + public void writeChatRoomData( + UUID chatRoomId, + Flux messageFlux, + SuccessCallback successCallback, + FailureCallback failureCallback) { messageFlux .map(message -> MessageTo.from(chatRoomId, message)) + .doOnComplete(() -> successCallback.accept(chatRoomId)) + .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)) .subscribe(messageTo -> messageRepository.save(messageTo)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java index 1b20aa37..376679af 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java @@ -41,7 +41,15 @@ public class NoStorageStorageConfiguration } @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) {} + public void writeChatRoomData( + UUID chatRoomId, + Flux messageFlux, + SuccessCallback successCallback, + FailureCallback failureCallback + ) + { + successCallback.accept(chatRoomId); + } @Override public Flux readChatRoomData(UUID chatRoomId) -- 2.20.1