1 package de.juplo.kafka.chat.backend.persistence;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
7 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.springframework.stereotype.Component;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
13 import reactor.core.publisher.Sinks;
15 import java.time.LocalDateTime;
16 import java.util.LinkedHashMap;
17 import java.util.UUID;
18 import java.util.stream.Stream;
22 @RequiredArgsConstructor
24 public class InMemoryPersistenceStrategy implements PersistenceStrategy
26 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
29 public Mono<Message> persistMessage(
30 Message.MessageKey key,
31 LocalDateTime timestamp,
34 Message message = new Message(key, (long)messages.size(), timestamp, text);
36 Message existing = messages.get(key);
39 log.info("Message with key {} already exists; {}", key, existing);
40 if (!message.equals(existing))
41 throw new MessageMutationException(message, existing);
45 messages.put(key, message);
47 .fromSupplier(() -> message)
52 public Mono<Message> getMessage(Message.MessageKey key)
54 return Mono.fromSupplier(() -> messages.get(key));
58 public Flux<Message> getMessages(long first, long last)
60 return Flux.fromStream(messages
65 long serial = message.getSerialNumber();
66 return serial >= first && serial <= last;