1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
9 import java.time.LocalDateTime;
10 import java.util.LinkedHashMap;
14 public class InMemoryChatMessageService implements ChatMessageService
16 private final LinkedHashMap<Message.MessageKey, Message> messages;
19 public InMemoryChatMessageService(Flux<Message> messageFlux)
21 log.debug("Creating InMemoryChatMessageService");
22 messages = new LinkedHashMap<>();
24 .doOnNext(message -> messages.put(message.getKey(), message))
26 .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
27 .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
32 public Mono<Message> persistMessage(
33 Message.MessageKey key,
34 LocalDateTime timestamp,
37 Message message = new Message(key, (long)messages.size(), timestamp, text);
38 messages.put(message.getKey(), message);
39 return Mono.just(message);
43 public Mono<Message> getMessage(Message.MessageKey key)
45 return Mono.fromSupplier(() -> messages.get(key));
49 public Flux<Message> getMessages(long first, long last)
51 return Flux.fromStream(messages
56 long serial = message.getSerialNumber();
57 return serial >= first && serial <= last;