WIP - Ein Versuch (vielleicht Unsinn)
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.concurrent.Phaser;
12 import java.util.concurrent.atomic.AtomicBoolean;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.function.BiConsumer;
15 import java.util.function.Consumer;
16
17
18 public interface StorageStrategy
19 {
20   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
21
22   AtomicBoolean running = new AtomicBoolean(true);
23
24
25   default void write(ChatHomeService chatHomeService)
26   {
27     if (!running.getAndSet(false))
28     {
29       log.info("{} is not running, skip write...", chatHomeService);
30       return;
31     }
32
33     Phaser writtenChatRooms = new Phaser(1);
34     AtomicInteger numErrors = new AtomicInteger();
35
36     Flux<ChatRoomInfo> chatRoomInfoFlux =
37         chatHomeService
38             .getChatRoomInfo()
39             .doOnNext(chatRoomInfo -> writtenChatRooms.register())
40             .doOnNext(chatRoomInfo -> writeChatRoomData(
41                 chatRoomInfo.getId(),
42                 chatHomeService
43                     .getChatRoomData(chatRoomInfo.getId())
44                     .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
45                 (chatRoomId) ->
46                 {
47                   logSuccess(chatRoomId);
48                   writtenChatRooms.arriveAndDeregister();
49                 },
50                 (chatRoomId, throwable) ->
51                 {
52                   logFailure(chatRoomId, throwable);
53                   numErrors.incrementAndGet();
54                   writtenChatRooms.arriveAndDeregister();
55                 }));
56
57     writeChatRoomInfo(chatRoomInfoFlux);
58
59     writtenChatRooms.arriveAndAwaitAdvance();
60     if (numErrors.get() > 0)
61     {
62       throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
63     }
64
65     log.info("All chat-rooms were written successfully for {}", chatHomeService);
66   }
67
68   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
69   Flux<ChatRoomInfo> readChatRoomInfo();
70   default void writeChatRoomData(
71       UUID chatRoomId,
72       Flux<Message> messageFlux,
73       SuccessCallback successCallback,
74       FailureCallback failureCallback)
75   {
76     writeChatRoomData(
77         chatRoomId,
78         messageFlux
79             .doOnComplete(() -> successCallback.accept(chatRoomId))
80             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
81   }
82   void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
83   Flux<Message> readChatRoomData(UUID chatRoomId);
84
85   interface SuccessCallback extends Consumer<UUID> {}
86   interface FailureCallback extends BiConsumer<UUID, Throwable> {}
87
88   default void logSuccess(UUID chatRoomId)
89   {
90     log.info("Successfully stored chat-room {}", chatRoomId);
91   }
92
93   default void logFailure(UUID chatRoomId, Throwable throwable)
94   {
95     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
96   }
97 }