import de.juplo.kafka.chat.backend.domain.ChatMessageService;
import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
+import java.util.UUID;
@Slf4j
public class InMemoryChatMessageService implements ChatMessageService
{
+ private final UUID chatRoomId;
private final LinkedHashMap<Message.MessageKey, Message> messages;
- public InMemoryChatMessageService(Flux<Message> messageFlux)
+ public InMemoryChatMessageService(UUID chatRoomId)
{
- log.debug("Creating InMemoryChatRoomService");
+ log.debug("Creating InMemoryChatMessageService");
+ this.chatRoomId = chatRoomId;
messages = new LinkedHashMap<>();
- messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+ }
+
+
+ Mono<Void> restore(StorageStrategy storageStrategy)
+ {
+ Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
+
+ return messageFlux
+ .doOnNext(message -> messages.put(message.getKey(), message))
+ .count()
+ .doOnSuccess(count -> log.info("Restored InMemoryChatMessageService with {} messages", count))
+ .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
+ .then();
}
@Override