1 package de.juplo.kafka.chat.backend.implementation;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
10 import java.util.UUID;
11 import java.util.concurrent.Phaser;
12 import java.util.concurrent.atomic.AtomicBoolean;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.function.BiConsumer;
15 import java.util.function.Consumer;
18 public interface StorageStrategy
20 Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
22 AtomicBoolean running = new AtomicBoolean(true);
25 default void write(ChatHomeService chatHomeService)
27 if (!running.getAndSet(false))
29 log.info("{} is not running, skip write...", chatHomeService);
33 Phaser writtenChatRooms = new Phaser(1);
34 AtomicInteger numErrors = new AtomicInteger();
36 Flux<ChatRoomInfo> chatRoomInfoFlux =
39 .doOnNext(chatRoomInfo -> writtenChatRooms.register())
40 .doOnNext(chatRoomInfo -> writeChatRoomData(
43 .getChatRoomData(chatRoomInfo.getId())
44 .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
47 logSuccess(chatRoomId);
48 writtenChatRooms.arriveAndDeregister();
50 (chatRoomId, throwable) ->
52 logFailure(chatRoomId, throwable);
53 numErrors.incrementAndGet();
54 writtenChatRooms.arriveAndDeregister();
57 writeChatRoomInfo(chatRoomInfoFlux);
59 writtenChatRooms.arriveAndAwaitAdvance();
60 if (numErrors.get() > 0)
62 throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
65 log.info("All chat-rooms were written successfully for {}", chatHomeService);
68 default void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
73 (id) -> logSuccess(id),
74 (id, throwable) -> logFailure(id, throwable));
77 void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
78 Flux<ChatRoomInfo> readChatRoomInfo();
79 void writeChatRoomData(
81 Flux<Message> messageFlux,
82 SuccessCallback callback,
83 FailureCallback failureCallback);
84 Flux<Message> readChatRoomData(UUID chatRoomId);
86 interface SuccessCallback extends Consumer<UUID> {}
87 interface FailureCallback extends BiConsumer<UUID, Throwable> {}
89 default void logSuccess(UUID chatRoomId)
91 log.info("Successfully stored chat-room {}", chatRoomId);
94 default void logFailure(UUID chatRoomId, Throwable throwable)
96 log.error("Could not store chat-room {}: {}", chatRoomId, throwable);