FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.function.BiConsumer;
12 import java.util.function.Consumer;
13
14
15 public interface StorageStrategy
16 {
17   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
18
19   default void write(ChatHomeService chatHomeService)
20   {
21     writeChatRoomInfo(
22         chatHomeService
23             .getChatRoomInfo()
24             .doOnNext(chatRoomInfo ->
25                 writeChatRoomData(
26                     chatRoomInfo.getId(),
27                     chatHomeService
28                         .getChatRoomData(chatRoomInfo.getId())
29                         .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
30                     this::logSuccess,
31                     this::logFailure)));
32   }
33
34   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
35   Flux<ChatRoomInfo> readChatRoomInfo();
36   default void writeChatRoomData(
37       UUID chatRoomId,
38       Flux<Message> messageFlux,
39       SuccessCallback successCallback,
40       FailureCallback failureCallback)
41   {
42     writeChatRoomData(
43         chatRoomId,
44         messageFlux
45             .doOnComplete(() -> successCallback.accept(chatRoomId))
46             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
47   }
48   void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
49   Flux<Message> readChatRoomData(UUID chatRoomId);
50
51   interface SuccessCallback extends Consumer<UUID> {}
52   interface FailureCallback extends BiConsumer<UUID, Throwable> {}
53
54   default void logSuccess(UUID chatRoomId)
55   {
56     log.info("Successfully stored chat-room {}", chatRoomId);
57   }
58
59   default void logFailure(UUID chatRoomId, Throwable throwable)
60   {
61     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
62   }
63 }