refactor: Only `LocalJsonFileStorageStrategy` restores `Chatroom`s
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / InMemoryPersistenceStrategy.java
index f6c76fa..4b522a8 100644 (file)
@@ -3,27 +3,31 @@ package de.juplo.kafka.chat.backend.persistence;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
 
 import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
-import java.util.UUID;
-import java.util.stream.Stream;
 
 
-@Component
-@RequiredArgsConstructor
 @Slf4j
 public class InMemoryPersistenceStrategy implements PersistenceStrategy
 {
-  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+  private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+  public InMemoryPersistenceStrategy(LinkedHashMap<Message.MessageKey, Message> messages)
+  {
+    this.messages = messages;
+  }
+
+  public InMemoryPersistenceStrategy(Flux<Message> messageFlux)
+  {
+    log.debug("Creating InMemoryPersistenceStrategy");
+    messages = new LinkedHashMap<>();
+    messageFlux.subscribe(message -> persistMessage(message));
+  }
 
   @Override
   public Mono<Message> persistMessage(
@@ -32,20 +36,23 @@ public class InMemoryPersistenceStrategy implements PersistenceStrategy
       String text)
   {
     Message message = new Message(key, (long)messages.size(), timestamp, text);
+    return Mono.justOrEmpty(persistMessage(message));
+  }
 
+  private Message persistMessage(Message message)
+  {
+    Message.MessageKey key = message.getKey();
     Message existing = messages.get(key);
     if (existing != null)
     {
       log.info("Message with key {} already exists; {}", key, existing);
       if (!message.equals(existing))
         throw new MessageMutationException(message, existing);
-      return Mono.empty();
+      return null;
     }
 
     messages.put(key, message);
-    return Mono
-        .fromSupplier(() -> message)
-        .log();
+    return message;
   }
 
   @Override