FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.concurrent.Phaser;
12 import java.util.concurrent.atomic.AtomicInteger;
13 import java.util.function.BiConsumer;
14 import java.util.function.Consumer;
15
16
17 public interface StorageStrategy
18 {
19   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
20
21   default void write(ChatHomeService chatHomeService)
22   {
23     Phaser writtenChatRooms = new Phaser(1);
24     AtomicInteger numErrors = new AtomicInteger();
25
26     writeChatRoomInfo(
27         chatHomeService
28             .getChatRoomInfo()
29             .doOnNext(chatRoomInfo -> writtenChatRooms.register())
30             .doOnNext(chatRoomInfo -> writeChatRoomData(
31                 chatRoomInfo.getId(),
32                 chatHomeService
33                     .getChatRoomData(chatRoomInfo.getId())
34                     .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
35                 (chatRoomId) ->
36                 {
37                   logSuccess(chatRoomId);
38                   writtenChatRooms.arriveAndDeregister();
39                 },
40                 (chatRoomId, throwable) ->
41                 {
42                   logFailure(chatRoomId, throwable);
43                   numErrors.incrementAndGet();
44                   writtenChatRooms.arriveAndDeregister();
45                 })));
46
47     writtenChatRooms.arriveAndAwaitAdvance();
48     if (numErrors.get() > 0)
49     {
50       throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
51     }
52
53     log.info("All chat-rooms were written successfully for {}", chatHomeService);
54   }
55
56   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
57   Flux<ChatRoomInfo> readChatRoomInfo();
58   default void writeChatRoomData(
59       UUID chatRoomId,
60       Flux<Message> messageFlux,
61       SuccessCallback successCallback,
62       FailureCallback failureCallback)
63   {
64     writeChatRoomData(
65         chatRoomId,
66         messageFlux
67             .doOnComplete(() -> successCallback.accept(chatRoomId))
68             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
69   }
70   void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
71   Flux<Message> readChatRoomData(UUID chatRoomId);
72
73   interface SuccessCallback extends Consumer<UUID> {}
74   interface FailureCallback extends BiConsumer<UUID, Throwable> {}
75
76   default void logSuccess(UUID chatRoomId)
77   {
78     log.info("Successfully stored chat-room {}", chatRoomId);
79   }
80
81   default void logFailure(UUID chatRoomId, Throwable throwable)
82   {
83     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
84   }
85 }