-package de.juplo.kafka.chat.backend.persistence;
+package de.juplo.kafka.chat.backend.implementation;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import java.util.UUID;
public interface StorageStrategy
{
- default void write(ChatHomeService chatHomeService)
+ Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
+
+ default Mono<Void> write(ChatHomeService chatHomeService)
{
- writeChatRoomInfo(
- chatHomeService
- .getChatRoomInfo()
- .doOnNext(chatRoomInfo -> writeChatRoomData(
- chatRoomInfo.getId(),
- chatHomeService
- .getChatRoomData(chatRoomInfo.getId())
- .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
+ return writeChatRoomInfo(chatHomeService.getChatRoomInfo())
+ .flatMap(chatRoomInfo -> writeChatRoomData(
+ chatRoomInfo.getId(),
+ chatHomeService
+ .getChatRoomData(chatRoomInfo.getId())
+ .flatMapMany(chatRoomData -> chatRoomData.getMessages())
+ )
+ .count()
+ .doOnSuccess(count -> log.info("Stored {} messages for {}", count, chatRoomInfo))
+ .doOnError(throwable -> log.error("Could not store {}: {}", chatRoomInfo, throwable)))
+ .count()
+ .doOnSuccess(count -> log.info("Stored {} chat-rooms for {}", count, chatHomeService))
+ .doOnError(throwable -> log.error("Could not store {}: {}", chatHomeService, throwable))
+ .then();
}
- void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+ Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
Flux<ChatRoomInfo> readChatRoomInfo();
- void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+ Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
Flux<Message> readChatRoomData(UUID chatRoomId);
}