1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
10 import java.time.LocalDateTime;
11 import java.util.LinkedHashMap;
12 import java.util.UUID;
16 public class InMemoryChatMessageService implements ChatMessageService
18 private final UUID chatRoomId;
19 private final LinkedHashMap<Message.MessageKey, Message> messages;
22 public InMemoryChatMessageService(UUID chatRoomId)
24 log.debug("Creating InMemoryChatMessageService");
25 this.chatRoomId = chatRoomId;
26 messages = new LinkedHashMap<>();
30 Mono<Void> restore(StorageStrategy storageStrategy)
32 Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
35 .doOnNext(message -> messages.put(message.getKey(), message))
37 .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
38 .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"));
42 public Mono<Message> persistMessage(
43 Message.MessageKey key,
44 LocalDateTime timestamp,
47 Message message = new Message(key, (long)messages.size(), timestamp, text);
48 messages.put(message.getKey(), message);
49 return Mono.just(message);
53 public Mono<Message> getMessage(Message.MessageKey key)
55 return Mono.fromSupplier(() -> messages.get(key));
59 public Flux<Message> getMessages(long first, long last)
61 return Flux.fromStream(messages
66 long serial = message.getSerialNumber();
67 return serial >= first && serial <= last;