import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
public interface StorageStrategy
{
+ Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
+
default void write(ChatHomeService chatHomeService)
{
writeChatRoomInfo(
.flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
}
+ default void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ writeChatRoomData(
+ chatRoomId,
+ messageFlux,
+ (id) -> log.info("Successfully stored chat-room {}", id),
+ (id, throwable) -> log.error("Could not store chat-room {}: {}", id, throwable));
+ }
+
void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
Flux<ChatRoomInfo> readChatRoomInfo();
- void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+ void writeChatRoomData(
+ UUID chatRoomId,
+ Flux<Message> messageFlux,
+ SuccessCallback callback,
+ FailureCallback failureCallback);
Flux<Message> readChatRoomData(UUID chatRoomId);
+
+ interface SuccessCallback extends Consumer<UUID> {}
+ interface FailureCallback extends BiConsumer<UUID, Throwable> {}
}
@Override
public void writeChatRoomData(
UUID chatRoomId,
- Flux<Message> messageFlux)
+ Flux<Message> messageFlux,
+ SuccessCallback successCallback,
+ FailureCallback failureCallback)
{
Path path = chatroomPath(chatRoomId);
log.info("Writing messages for {} to {}", chatRoomId, path);
throw new RuntimeException(e);
}
});
+
+ successCallback.accept(chatRoomId);
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ failureCallback.accept(chatRoomId, e);
}
}
}
@Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ public void writeChatRoomData(
+ UUID chatRoomId,
+ Flux<Message> messageFlux,
+ SuccessCallback successCallback,
+ FailureCallback failureCallback)
{
messageFlux
.map(message -> MessageTo.from(chatRoomId, message))
+ .doOnComplete(() -> successCallback.accept(chatRoomId))
+ .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable))
.subscribe(messageTo -> messageRepository.save(messageTo));
}
}
@Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+ public void writeChatRoomData(
+ UUID chatRoomId,
+ Flux<Message> messageFlux,
+ SuccessCallback successCallback,
+ FailureCallback failureCallback
+ )
+ {
+ successCallback.accept(chatRoomId);
+ }
@Override
public Flux<Message> readChatRoomData(UUID chatRoomId)