1 package de.juplo.kafka.chat.backend.persistence;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
10 import java.time.LocalDateTime;
11 import java.util.LinkedHashMap;
15 public class InMemoryChatRoomService implements ChatRoomService
17 private final LinkedHashMap<Message.MessageKey, Message> messages;
20 public InMemoryChatRoomService(LinkedHashMap<Message.MessageKey, Message> messages)
22 this.messages = messages;
25 public InMemoryChatRoomService(Flux<Message> messageFlux)
27 log.debug("Creating InMemoryChatroomService");
28 messages = new LinkedHashMap<>();
29 messageFlux.subscribe(message -> persistMessage(message));
33 public Mono<Message> persistMessage(
34 Message.MessageKey key,
35 LocalDateTime timestamp,
38 Message message = new Message(key, (long)messages.size(), timestamp, text);
39 return Mono.justOrEmpty(persistMessage(message));
42 private Message persistMessage(Message message)
44 Message.MessageKey key = message.getKey();
45 Message existing = messages.get(key);
48 log.info("Message with key {} already exists; {}", key, existing);
49 if (!message.equals(existing))
50 throw new MessageMutationException(message, existing);
54 messages.put(key, message);
59 public Mono<Message> getMessage(Message.MessageKey key)
61 return Mono.fromSupplier(() -> messages.get(key));
65 public Flux<Message> getMessages(long first, long last)
67 return Flux.fromStream(messages
72 long serial = message.getSerialNumber();
73 return serial >= first && serial <= last;