1 package de.juplo.kafka.chat.backend.implementation;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
15 public interface StorageStrategy
17 Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
19 default Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
23 this::logSuccessChatHomeService,
24 this::logFailureChatHomeService);
27 default Flux<ChatRoomInfo> write(
28 ChatHomeService chatHomeService,
29 ChatHomeServiceWrittenSuccessCallback successCallback,
30 ChatHomeServiceWrittenFailureCallback failureCallback)
32 return writeChatRoomInfo(
35 .doOnComplete(() -> successCallback.accept(chatHomeService))
36 .doOnError(throwable -> failureCallback.accept(chatHomeService, throwable))
37 .doOnNext(chatRoomInfo ->
41 .getChatRoomData(chatRoomInfo.getId())
42 .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
44 this::logSuccessChatRoom,
45 this::logFailureChatRoom).subscribe()));
48 Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
49 Flux<ChatRoomInfo> readChatRoomInfo();
50 default Flux<Message> writeChatRoomData(
52 Flux<Message> messageFlux,
53 ChatRoomWrittenSuccessCallback successCallback,
54 ChatRoomWrittenFailureCallback failureCallback)
56 return writeChatRoomData(
59 .doOnComplete(() -> successCallback.accept(chatRoomId))
60 .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
62 Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
63 Flux<Message> readChatRoomData(UUID chatRoomId);
65 interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
66 interface ChatHomeServiceWrittenFailureCallback extends BiConsumer<ChatHomeService, Throwable> {}
68 default void logSuccessChatHomeService(ChatHomeService chatHomeService)
70 log.info("Successfully stored {}", chatHomeService);
73 default void logFailureChatHomeService(ChatHomeService chatHomeService, Throwable throwable)
75 log.error("Could not store {}: {}", chatHomeService, throwable);
78 interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
79 interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
81 default void logSuccessChatRoom(UUID chatRoomId)
83 log.info("Successfully stored chat-room {}", chatRoomId);
86 default void logFailureChatRoom(UUID chatRoomId, Throwable throwable)
88 log.error("Could not store chat-room {}: {}", chatRoomId, throwable);