refactor: Added success- and failure-callbacks to `StorageStrategy`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
13
14
15 public interface StorageStrategy
16 {
17   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
18
19   default void write(ChatHomeService chatHomeService)
20   {
21     writeChatRoomInfo(
22         chatHomeService
23             .getChatRoomInfo()
24             .doOnNext(chatRoomInfo -> writeChatRoomData(
25                 chatRoomInfo.getId(),
26                 chatHomeService
27                     .getChatRoomData(chatRoomInfo.getId())
28                     .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
29                 this::logSuccessChatRoom,
30                 this::logFailureChatRoom)));
31   }
32
33   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
34   Flux<ChatRoomInfo> readChatRoomInfo();
35   default void writeChatRoomData(
36       UUID chatRoomId,
37       Flux<Message> messageFlux,
38       ChatRoomWrittenSuccessCallback successCallback,
39       ChatRoomWrittenFailureCallback failureCallback)
40   {
41     writeChatRoomData(
42         chatRoomId,
43         messageFlux
44             .doOnComplete(() -> successCallback.accept(chatRoomId))
45             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
46   }
47   void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
48   Flux<Message> readChatRoomData(UUID chatRoomId);
49
50   interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
51   interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
52
53   default void logSuccessChatRoom(UUID chatRoomId)
54   {
55     log.info("Successfully stored chat-room {}", chatRoomId);
56   }
57
58   default void logFailureChatRoom(UUID chatRoomId, Throwable throwable)
59   {
60     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
61   }
62 }