this.messages = messages;
}
+ public InMemoryPersistenceStrategy(Flux<Message> messageFlux)
+ {
+ log.debug("Creating InMemoryPersistenceStrategy");
+ messages = new LinkedHashMap<>();
+ messageFlux.subscribe(message -> persistMessage(message));
+ }
@Override
public Mono<Message> persistMessage(
String text)
{
Message message = new Message(key, (long)messages.size(), timestamp, text);
+ return Mono.justOrEmpty(persistMessage(message));
+ }
+ private Message persistMessage(Message message)
+ {
+ Message.MessageKey key = message.getKey();
Message existing = messages.get(key);
if (existing != null)
{
log.info("Message with key {} already exists; {}", key, existing);
if (!message.equals(existing))
throw new MessageMutationException(message, existing);
- return Mono.empty();
+ return null;
}
messages.put(key, message);
- return Mono
- .fromSupplier(() -> message)
- .log();
+ return message;
}
@Override