fix: Refined `ChatBackendControllerTest` and fixed a bug in `ChatRoom`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / InMemoryChatRoomService.java
index e06709f..f789d35 100644 (file)
@@ -1,8 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence;
 
 import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
-import de.juplo.kafka.chat.backend.domain.ChatroomService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -12,46 +11,31 @@ import java.util.LinkedHashMap;
 
 
 @Slf4j
-public class InMemoryChatroomService implements ChatroomService
+public class InMemoryChatRoomService implements ChatRoomService
 {
   private final LinkedHashMap<Message.MessageKey, Message> messages;
 
 
-  public InMemoryChatroomService(LinkedHashMap<Message.MessageKey, Message> messages)
+  public InMemoryChatRoomService(LinkedHashMap<Message.MessageKey, Message> messages)
   {
     this.messages = messages;
   }
 
-  public InMemoryChatroomService(Flux<Message> messageFlux)
+  public InMemoryChatRoomService(Flux<Message> messageFlux)
   {
-    log.debug("Creating InMemoryChatroomService");
+    log.debug("Creating InMemoryChatRoomService");
     messages = new LinkedHashMap<>();
-    messageFlux.subscribe(message -> persistMessage(message));
+    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
   }
 
   @Override
-  public Mono<Message> persistMessage(
+  public Message persistMessage(
       Message.MessageKey key,
       LocalDateTime timestamp,
       String text)
   {
     Message message = new Message(key, (long)messages.size(), timestamp, text);
-    return Mono.justOrEmpty(persistMessage(message));
-  }
-
-  private Message persistMessage(Message message)
-  {
-    Message.MessageKey key = message.getKey();
-    Message existing = messages.get(key);
-    if (existing != null)
-    {
-      log.info("Message with key {} already exists; {}", key, existing);
-      if (!message.equals(existing))
-        throw new MessageMutationException(message, existing);
-      return null;
-    }
-
-    messages.put(key, message);
+    messages.put(message.getKey(), message);
     return message;
   }