1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
10 import java.time.LocalDateTime;
11 import java.util.LinkedHashMap;
12 import java.util.UUID;
16 public class InMemoryChatMessageService implements ChatMessageService
18 private final UUID chatRoomId;
19 private final LinkedHashMap<Message.MessageKey, Message> messages;
22 public InMemoryChatMessageService(UUID chatRoomId)
24 log.debug("Creating InMemoryChatMessageService");
25 this.chatRoomId = chatRoomId;
26 messages = new LinkedHashMap<>();
30 Mono<Void> restore(StorageStrategy storageStrategy)
32 Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
35 .doOnNext(message -> messages.put(message.getKey(), message))
37 .doOnSuccess(count -> log.info("Restored InMemoryChatMessageService with {} messages", count))
38 .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
43 public Mono<Message> persistMessage(
44 Message.MessageKey key,
45 LocalDateTime timestamp,
48 Message message = new Message(key, (long)messages.size(), timestamp, text);
49 messages.put(message.getKey(), message);
50 return Mono.just(message);
54 public Mono<Message> getMessage(Message.MessageKey key)
56 return Mono.fromSupplier(() -> messages.get(key));
60 public Flux<Message> getMessages(long first, long last)
62 return Flux.fromStream(messages
67 long serial = message.getSerialNumber();
68 return serial >= first && serial <= last;