1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
11 import java.time.LocalDateTime;
12 import java.util.LinkedHashMap;
13 import java.util.UUID;
17 public class InMemoryChatMessageService implements ChatMessageService
20 private final UUID chatRoomId;
21 private final LinkedHashMap<Message.MessageKey, Message> messages;
24 public InMemoryChatMessageService(UUID chatRoomId)
26 log.debug("Creating InMemoryChatMessageService");
27 this.chatRoomId = chatRoomId;
28 messages = new LinkedHashMap<>();
32 Mono<Void> restore(StorageStrategy storageStrategy)
34 Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
37 .doOnNext(message -> messages.put(message.getKey(), message))
39 .doOnSuccess(count -> log.info("Restored InMemoryChatMessageService with {} messages", count))
40 .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
45 public Mono<Message> persistMessage(
46 Message.MessageKey key,
47 LocalDateTime timestamp,
50 Message message = new Message(key, (long)messages.size(), timestamp, text);
51 messages.put(message.getKey(), message);
52 return Mono.just(message);
56 public Mono<Message> getMessage(Message.MessageKey key)
58 return Mono.fromSupplier(() -> messages.get(key));
62 public Flux<Message> getMessages(long first, long last)
64 return Flux.fromStream(messages
69 long serial = message.getSerialNumber();
70 return serial >= first && serial <= last;