package de.juplo.kafka.chat.backend.persistence;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
import de.juplo.kafka.chat.backend.domain.ChatRoomService;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
public InMemoryChatRoomService(Flux<Message> messageFlux)
{
- log.debug("Creating InMemoryChatroomService");
+ log.debug("Creating InMemoryChatRoomService");
messages = new LinkedHashMap<>();
- messageFlux.subscribe(message -> persistMessage(message));
+ messageFlux.subscribe(message -> messages.put(message.getKey(), message));
}
@Override
- public Mono<Message> persistMessage(
+ public Message persistMessage(
Message.MessageKey key,
LocalDateTime timestamp,
String text)
{
Message message = new Message(key, (long)messages.size(), timestamp, text);
- return Mono.justOrEmpty(persistMessage(message));
- }
-
- private Message persistMessage(Message message)
- {
- Message.MessageKey key = message.getKey();
- Message existing = messages.get(key);
- if (existing != null)
- {
- log.info("Message with key {} already exists; {}", key, existing);
- if (!message.equals(existing))
- throw new MessageMutationException(message, existing);
- return null;
- }
-
- messages.put(key, message);
+ messages.put(message.getKey(), message);
return message;
}