refactor: Added success- and failure-callbacks for `ChatHomeService`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
13
14
15 public interface StorageStrategy
16 {
17   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
18
19   default void write(ChatHomeService chatHomeService)
20   {
21     write(
22         chatHomeService,
23         this::logSuccessChatHomeService,
24         this::logFailureChatHomeService);
25   }
26
27   default void write(
28       ChatHomeService chatHomeService,
29       ChatHomeServiceWrittenSuccessCallback successCallback,
30       ChatHomeServiceWrittenFailureCallback failureCallback)
31   {
32     writeChatRoomInfo(
33         chatHomeService
34             .getChatRoomInfo()
35             .doOnComplete(() -> successCallback.accept(chatHomeService))
36             .doOnError(throwable -> failureCallback.accept(chatHomeService, throwable))
37             .doOnNext(chatRoomInfo ->
38                 writeChatRoomData(
39                     chatRoomInfo.getId(),
40                     chatHomeService
41                         .getChatRoomData(chatRoomInfo.getId())
42                         .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
43                     this::logSuccessChatRoom,
44                     this::logFailureChatRoom)));
45   }
46
47   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
48   Flux<ChatRoomInfo> readChatRoomInfo();
49   default void writeChatRoomData(
50       UUID chatRoomId,
51       Flux<Message> messageFlux,
52       ChatRoomWrittenSuccessCallback successCallback,
53       ChatRoomWrittenFailureCallback failureCallback)
54   {
55     writeChatRoomData(
56         chatRoomId,
57         messageFlux
58             .doOnComplete(() -> successCallback.accept(chatRoomId))
59             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
60   }
61   void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
62   Flux<Message> readChatRoomData(UUID chatRoomId);
63
64   interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
65   interface ChatHomeServiceWrittenFailureCallback extends BiConsumer<ChatHomeService, Throwable> {}
66
67   default void logSuccessChatHomeService(ChatHomeService chatHomeService)
68   {
69     log.info("Successfully stored {}", chatHomeService);
70   }
71
72   default void logFailureChatHomeService(ChatHomeService chatHomeService, Throwable throwable)
73   {
74     log.error("Could not store {}: {}", chatHomeService, throwable);
75   }
76
77   interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
78   interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
79
80   default void logSuccessChatRoom(UUID chatRoomId)
81   {
82     log.info("Successfully stored chat-room {}", chatRoomId);
83   }
84
85   default void logFailureChatRoom(UUID chatRoomId, Throwable throwable)
86   {
87     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
88   }
89 }