1 package de.juplo.kafka.chat.backend.implementation;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
15 public interface StorageStrategy
17 Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
19 default void write(ChatHomeService chatHomeService)
24 .doOnNext(chatRoomInfo ->
28 .getChatRoomData(chatRoomInfo.getId())
29 .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
32 default void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
37 (id) -> log.info("Successfully stored chat-room {}", id),
38 (id, throwable) -> log.error("Could not store chat-room {}: {}", id, throwable));
41 void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
42 Flux<ChatRoomInfo> readChatRoomInfo();
43 void writeChatRoomData(
45 Flux<Message> messageFlux,
46 SuccessCallback callback,
47 FailureCallback failureCallback);
48 Flux<Message> readChatRoomData(UUID chatRoomId);
50 interface SuccessCallback extends Consumer<UUID> {}
51 interface FailureCallback extends BiConsumer<UUID, Throwable> {}