WIP:callbacks - chat-home
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
13
14
15 public interface StorageStrategy
16 {
17   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
18
19   default void write(ChatHomeService chatHomeService)
20   {
21     write(
22         chatHomeService,
23         this::logSuccessChatHomeService,
24         this::logFailureChatHomeService);
25   }
26
27   default void write(
28       ChatHomeService chatHomeService,
29       ChatHomeServiceWrittenSuccessCallback successCallback,
30       ChatHomeServiceWrittenFailureCallback failureCallback)
31   {
32     writeChatRoomInfo(
33         chatHomeService
34             .getChatRoomInfo()
35             .doOnComplete(() -> successCallback.accept(chatHomeService))
36             .doOnError(throwable -> failureCallback.accept(chatHomeService, throwable))
37             .doOnNext(chatRoomInfo ->
38                 writeChatRoomData(
39                     chatRoomInfo.getId(),
40                     chatHomeService
41                         .getChatRoomData(chatRoomInfo.getId())
42                         .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
43                     this::logSuccessChatRoom,
44                     this::logFailureChatRoom)));
45   }
46   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
47   Flux<ChatRoomInfo> readChatRoomInfo();
48   default void writeChatRoomData(
49       UUID chatRoomId,
50       Flux<Message> messageFlux,
51       ChatRoomWrittenSuccessCallback successCallback,
52       ChatRoomWrittenFailureCallback failureCallback)
53   {
54     writeChatRoomData(
55         chatRoomId,
56         messageFlux
57             .doOnComplete(() -> successCallback.accept(chatRoomId))
58             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
59   }
60   void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
61   Flux<Message> readChatRoomData(UUID chatRoomId);
62
63   interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
64   interface ChatHomeServiceWrittenFailureCallback extends BiConsumer<ChatHomeService, Throwable> {}
65
66   default void logSuccessChatHomeService(ChatHomeService chatHomeService)
67   {
68     log.info("Successfully stored chat-home {}", chatHomeService);
69   }
70
71   default void logFailureChatHomeService(ChatHomeService chatHomeService, Throwable throwable)
72   {
73     log.error("Could not store chat-home {}: {}", chatHomeService, throwable);
74   }
75
76   interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
77   interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
78
79   default void logSuccessChatRoom(UUID chatRoomId)
80   {
81     log.info("Successfully stored chat-room {}", chatRoomId);
82   }
83
84   default void logFailureChatRoom(UUID chatRoomId, Throwable throwable)
85   {
86     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
87   }
88 }