1 package de.juplo.kafka.chat.backend.persistence;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
11 import java.time.LocalDateTime;
12 import java.util.LinkedHashMap;
15 @RequiredArgsConstructor
17 public class InMemoryPersistenceStrategy implements PersistenceStrategy
19 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
22 public Mono<Message> persistMessage(
23 Message.MessageKey key,
24 LocalDateTime timestamp,
27 Message message = new Message(key, (long)messages.size(), timestamp, text);
29 Message existing = messages.get(key);
32 log.info("Message with key {} already exists; {}", key, existing);
33 if (!message.equals(existing))
34 throw new MessageMutationException(message, existing);
38 messages.put(key, message);
40 .fromSupplier(() -> message)
45 public Mono<Message> getMessage(Message.MessageKey key)
47 return Mono.fromSupplier(() -> messages.get(key));
51 public Flux<Message> getMessages(long first, long last)
53 return Flux.fromStream(messages
58 long serial = message.getSerialNumber();
59 return serial >= first && serial <= last;