WIP - Ein Versuch (vielleicht Unsinn)
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.concurrent.Phaser;
12 import java.util.concurrent.atomic.AtomicBoolean;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.function.BiConsumer;
15 import java.util.function.Consumer;
16
17
18 public interface StorageStrategy
19 {
20   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
21
22   AtomicBoolean running = new AtomicBoolean(true);
23
24
25   default void write(ChatHomeService chatHomeService)
26   {
27     if (!running.getAndSet(false))
28     {
29       log.info("{} is not running, skip write...", chatHomeService);
30       return;
31     }
32
33     Phaser writtenChatRooms = new Phaser(1);
34     AtomicInteger numErrors = new AtomicInteger();
35
36     Flux<ChatRoomInfo> chatRoomInfoFlux =
37         chatHomeService
38             .getChatRoomInfo()
39             .doOnNext(chatRoomInfo -> writtenChatRooms.register())
40             .doOnNext(chatRoomInfo -> writeChatRoomData(
41                 chatRoomInfo.getId(),
42                 chatHomeService
43                     .getChatRoomData(chatRoomInfo.getId())
44                     .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
45                 (chatRoomId) ->
46                 {
47                   logSuccess(chatRoomId);
48                   writtenChatRooms.arriveAndDeregister();
49                 },
50                 (chatRoomId, throwable) ->
51                 {
52                   logFailure(chatRoomId, throwable);
53                   numErrors.incrementAndGet();
54                   writtenChatRooms.arriveAndDeregister();
55                 }));
56
57     writeChatRoomInfo(chatRoomInfoFlux);
58
59     writtenChatRooms.arriveAndAwaitAdvance();
60     if (numErrors.get() > 0)
61     {
62       throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
63     }
64
65     log.info("All chat-rooms were written successfully for {}", chatHomeService);
66   }
67
68   default void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
69   {
70     writeChatRoomData(
71         chatRoomId,
72         messageFlux,
73         (id) -> logSuccess(id),
74         (id, throwable) -> logFailure(id, throwable));
75   }
76
77   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
78   Flux<ChatRoomInfo> readChatRoomInfo();
79   void writeChatRoomData(
80       UUID chatRoomId,
81       Flux<Message> messageFlux,
82       SuccessCallback callback,
83       FailureCallback failureCallback);
84   Flux<Message> readChatRoomData(UUID chatRoomId);
85
86   interface SuccessCallback extends Consumer<UUID> {}
87   interface FailureCallback extends BiConsumer<UUID, Throwable> {}
88
89   default void logSuccess(UUID chatRoomId)
90   {
91     log.info("Successfully stored chat-room {}", chatRoomId);
92   }
93
94   default void logFailure(UUID chatRoomId, Throwable throwable)
95   {
96     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
97   }
98 }