refactor: Moved business-logic from `ChatRoomService` into `ChatRoom`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / InMemoryChatRoomService.java
index 1831037..49d400b 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence;
 
 import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
@@ -26,32 +25,17 @@ public class InMemoryChatRoomService implements ChatRoomService
   {
     log.debug("Creating InMemoryChatroomService");
     messages = new LinkedHashMap<>();
-    messageFlux.subscribe(message -> persistMessage(message));
+    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
   }
 
   @Override
-  public Mono<Message> persistMessage(
+  public Message persistMessage(
       Message.MessageKey key,
       LocalDateTime timestamp,
       String text)
   {
     Message message = new Message(key, (long)messages.size(), timestamp, text);
-    return Mono.justOrEmpty(persistMessage(message));
-  }
-
-  private Message persistMessage(Message message)
-  {
-    Message.MessageKey key = message.getKey();
-    Message existing = messages.get(key);
-    if (existing != null)
-    {
-      log.info("Message with key {} already exists; {}", key, existing);
-      if (!message.equals(existing))
-        throw new MessageMutationException(message, existing);
-      return null;
-    }
-
-    messages.put(key, message);
+    messages.put(message.getKey(), message);
     return message;
   }