import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.domain.MessageMutationException;
import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.LinkedHashMap;
-@RequiredArgsConstructor
@Slf4j
public class InMemoryPersistenceStrategy implements PersistenceStrategy
{
- private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+ private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+ public InMemoryPersistenceStrategy(LinkedHashMap<Message.MessageKey, Message> messages)
+ {
+ this.messages = messages;
+ }
+
+ public InMemoryPersistenceStrategy(Flux<Message> messageFlux)
+ {
+ log.debug("Creating InMemoryPersistenceStrategy");
+ messages = new LinkedHashMap<>();
+ messageFlux.subscribe(message -> persistMessage(message));
+ }
@Override
public Mono<Message> persistMessage(
String text)
{
Message message = new Message(key, (long)messages.size(), timestamp, text);
+ return Mono.justOrEmpty(persistMessage(message));
+ }
+ private Message persistMessage(Message message)
+ {
+ Message.MessageKey key = message.getKey();
Message existing = messages.get(key);
if (existing != null)
{
log.info("Message with key {} already exists; {}", key, existing);
if (!message.equals(existing))
throw new MessageMutationException(message, existing);
- return Mono.empty();
+ return null;
}
messages.put(key, message);
- return Mono
- .fromSupplier(() -> message)
- .log();
+ return message;
}
@Override