1 package de.juplo.kafka.chat.backend.implementation;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
15 public interface StorageStrategy
17 Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
19 default void write(ChatHomeService chatHomeService)
24 .doOnNext(chatRoomInfo -> writeChatRoomData(
27 .getChatRoomData(chatRoomInfo.getId())
28 .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
29 this::logSuccessChatRoom,
30 this::logFailureChatRoom)));
33 void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
34 Flux<ChatRoomInfo> readChatRoomInfo();
35 default void writeChatRoomData(
37 Flux<Message> messageFlux,
38 ChatRoomWrittenSuccessCallback successCallback,
39 ChatRoomWrittenFailureCallback failureCallback)
44 .doOnComplete(() -> successCallback.accept(chatRoomId))
45 .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
47 void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
48 Flux<Message> readChatRoomData(UUID chatRoomId);
50 interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
51 interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
53 default void logSuccessChatRoom(UUID chatRoomId)
55 log.info("Successfully stored chat-room {}", chatRoomId);
58 default void logFailureChatRoom(UUID chatRoomId, Throwable throwable)
60 log.error("Could not store chat-room {}: {}", chatRoomId, throwable);