WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
1 package de.juplo.kafka.chat.backend.implementation;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11 import java.util.concurrent.Phaser;
12 import java.util.concurrent.atomic.AtomicBoolean;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.function.BiConsumer;
15 import java.util.function.Consumer;
16
17
18 public interface StorageStrategy
19 {
20   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
21
22   AtomicBoolean running = new AtomicBoolean(true);
23
24
25   default void write(ChatHomeService chatHomeService)
26   {
27     if (!running.getAndSet(false))
28     {
29       log.info("{} is not running, skip write...", chatHomeService);
30       return;
31     }
32
33     Phaser writtenChatRooms = new Phaser(1);
34     AtomicInteger numErrors = new AtomicInteger();
35
36     writeChatRoomInfo(
37         chatHomeService
38             .getChatRoomInfo()
39             .doOnNext(chatRoomInfo -> writtenChatRooms.register())
40             .doOnNext(chatRoomInfo -> writeChatRoomData(
41                 chatRoomInfo.getId(),
42                 chatHomeService
43                     .getChatRoomData(chatRoomInfo.getId())
44                     .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
45                 (chatRoomId) ->
46                 {
47                   logSuccess(chatRoomId);
48                   writtenChatRooms.arriveAndDeregister();
49                 },
50                 (chatRoomId, throwable) ->
51                 {
52                   logFailure(chatRoomId, throwable);
53                   numErrors.incrementAndGet();
54                   writtenChatRooms.arriveAndDeregister();
55                 })));
56
57     writtenChatRooms.arriveAndAwaitAdvance();
58     if (numErrors.get() > 0)
59     {
60       throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
61     }
62
63     log.info("All chat-rooms were written successfully for {}", chatHomeService);
64   }
65
66   void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
67   Flux<ChatRoomInfo> readChatRoomInfo();
68   default void writeChatRoomData(
69       UUID chatRoomId,
70       Flux<Message> messageFlux,
71       SuccessCallback successCallback,
72       FailureCallback failureCallback)
73   {
74     writeChatRoomData(
75         chatRoomId,
76         messageFlux
77             .doOnComplete(() -> successCallback.accept(chatRoomId))
78             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
79   }
80   void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
81   Flux<Message> readChatRoomData(UUID chatRoomId);
82
83   interface SuccessCallback extends Consumer<UUID> {}
84   interface FailureCallback extends BiConsumer<UUID, Throwable> {}
85
86   default void logSuccess(UUID chatRoomId)
87   {
88     log.info("Successfully stored chat-room {}", chatRoomId);
89   }
90
91   default void logFailure(UUID chatRoomId, Throwable throwable)
92   {
93     log.error("Could not store chat-room {}: {}", chatRoomId, throwable);
94   }
95 }