1 package de.juplo.kafka.chat.backend.implementation;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
15 public interface StorageStrategy
17 Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
19 default void write(ChatHomeService chatHomeService)
24 .doOnNext(chatRoomInfo ->
28 .getChatRoomData(chatRoomInfo.getId())
29 .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
30 this::logSuccessChatRoom,
31 this::logFailureChatRoom)));
34 void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
35 Flux<ChatRoomInfo> readChatRoomInfo();
36 default void writeChatRoomData(
38 Flux<Message> messageFlux,
39 ChatRoomWrittenSuccessCallback successCallback,
40 ChatRoomWrittenFailureCallback failureCallback)
45 .doOnComplete(() -> successCallback.accept(chatRoomId))
46 .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
48 void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
49 Flux<Message> readChatRoomData(UUID chatRoomId);
51 interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
52 interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
54 default void logSuccessChatRoom(UUID chatRoomId)
56 log.info("Successfully stored chat-room {}", chatRoomId);
59 default void logFailureChatRoom(UUID chatRoomId, Throwable throwable)
61 log.error("Could not store chat-room {}: {}", chatRoomId, throwable);