WIP:callbacks
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
13
14
15 public interface StorageStrategy
16 {
17   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
18
19   default void write(ChatHomeService chatHomeService)
20   {
21     writeChatRoomInfo(
22         chatHomeService
23             .getChatRoomInfo()
24             .doOnNext(chatRoomInfo ->
25                 writeChatRoomData(
26                     chatRoomInfo.getId(),
27                     chatHomeService
28                         .getChatRoomData(chatRoomInfo.getId())
29                         .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
30   }
31
32   default void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
33   {
34     writeChatRoomData(
35         chatRoomId,
36         messageFlux,
37         (id) -> logSuccess(id),
38         (id, throwable) -> logFailure(id, throwable));
39   }
40
41   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
42   Flux<ChatRoomInfo> readChatRoomInfo();
43   void writeChatRoomData(
44       UUID chatRoomId,
45       Flux<Message> messageFlux,
46       SuccessCallback callback,
47       FailureCallback failureCallback);
48   Flux<Message> readChatRoomData(UUID chatRoomId);
49
50   interface SuccessCallback extends Consumer<UUID> {}
51   interface FailureCallback extends BiConsumer<UUID, Throwable> {}
52
53   default void logSuccess(UUID chatRoomId)
54   {
55     log.info("Successfully stored chat-room {}", chatRoomId);
56   }
57
58   default void logFailure(UUID chatRoomId, Throwable throwable)
59   {
60     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
61   }
62 }