1 package de.juplo.kafka.chat.backend.implementation;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import reactor.core.publisher.Flux;
10 import java.util.UUID;
11 import java.util.concurrent.Phaser;
12 import java.util.concurrent.atomic.AtomicInteger;
13 import java.util.function.BiConsumer;
14 import java.util.function.Consumer;
17 public interface StorageStrategy
19 Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
21 default void write(ChatHomeService chatHomeService)
23 Phaser writtenChatRooms = new Phaser(1);
24 AtomicInteger numErrors = new AtomicInteger();
29 .doOnNext(chatRoomInfo -> writtenChatRooms.register())
30 .doOnNext(chatRoomInfo -> writeChatRoomData(
33 .getChatRoomData(chatRoomInfo.getId())
34 .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
37 logSuccess(chatRoomId);
38 writtenChatRooms.arriveAndDeregister();
40 (chatRoomId, throwable) ->
42 logFailure(chatRoomId, throwable);
43 numErrors.incrementAndGet();
44 writtenChatRooms.arriveAndDeregister();
47 writtenChatRooms.arriveAndAwaitAdvance();
48 if (numErrors.get() > 0)
50 throw new RuntimeException("Could not write all chat-rooms for " + chatHomeService);
53 log.info("All chat-rooms were written successfully for {}", chatHomeService);
56 void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
57 Flux<ChatRoomInfo> readChatRoomInfo();
58 default void writeChatRoomData(
60 Flux<Message> messageFlux,
61 SuccessCallback successCallback,
62 FailureCallback failureCallback)
67 .doOnComplete(() -> successCallback.accept(chatRoomId))
68 .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
70 void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
71 Flux<Message> readChatRoomData(UUID chatRoomId);
73 interface SuccessCallback extends Consumer<UUID> {}
74 interface FailureCallback extends BiConsumer<UUID, Throwable> {}
76 default void logSuccess(UUID chatRoomId)
78 log.info("Successfully stored chat-room {}", chatRoomId);
81 default void logFailure(UUID chatRoomId, Throwable throwable)
83 log.error("Could not store chat-room {}: {}", chatRoomId, throwable);