WIP:mongodb map vs subscribe - subscribe rausgezogen
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
13
14
15 public interface StorageStrategy
16 {
17   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
18
19   default Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
20   {
21     return write(
22         chatHomeService,
23         this::logSuccessChatHomeService,
24         this::logFailureChatHomeService);
25   }
26
27   default Flux<ChatRoomInfo> write(
28       ChatHomeService chatHomeService,
29       ChatHomeServiceWrittenSuccessCallback successCallback,
30       ChatHomeServiceWrittenFailureCallback failureCallback)
31   {
32     return writeChatRoomInfo(
33         chatHomeService
34             .getChatRoomInfo()
35             .doOnComplete(() -> successCallback.accept(chatHomeService))
36             .doOnError(throwable -> failureCallback.accept(chatHomeService, throwable))
37             .doOnNext(chatRoomInfo ->
38                 writeChatRoomData(
39                     chatRoomInfo.getId(),
40                     chatHomeService
41                         .getChatRoomData(chatRoomInfo.getId())
42                         .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
43
44                     this::logSuccessChatRoom,
45                     this::logFailureChatRoom).subscribe()));
46   }
47
48   Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
49   Flux<ChatRoomInfo> readChatRoomInfo();
50   default Flux<Message> writeChatRoomData(
51       UUID chatRoomId,
52       Flux<Message> messageFlux,
53       ChatRoomWrittenSuccessCallback successCallback,
54       ChatRoomWrittenFailureCallback failureCallback)
55   {
56     return writeChatRoomData(
57         chatRoomId,
58         messageFlux
59             .doOnComplete(() -> successCallback.accept(chatRoomId))
60             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
61   }
62   Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
63   Flux<Message> readChatRoomData(UUID chatRoomId);
64
65   interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
66   interface ChatHomeServiceWrittenFailureCallback extends BiConsumer<ChatHomeService, Throwable> {}
67
68   default void logSuccessChatHomeService(ChatHomeService chatHomeService)
69   {
70     log.info("Successfully stored {}", chatHomeService);
71   }
72
73   default void logFailureChatHomeService(ChatHomeService chatHomeService, Throwable throwable)
74   {
75     log.error("Could not store {}: {}", chatHomeService, throwable);
76   }
77
78   interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
79   interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
80
81   default void logSuccessChatRoom(UUID chatRoomId)
82   {
83     log.info("Successfully stored chat-room {}", chatRoomId);
84   }
85
86   default void logFailureChatRoom(UUID chatRoomId, Throwable throwable)
87   {
88     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
89   }
90 }