NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index ed155df..3a8c2c6 100644 (file)
@@ -30,42 +30,14 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
-    return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text);
+    return kafkaChatHomeService
+        .sendMessage(chatRoomId, key, timestamp, text)
+        .doOnSuccess(message -> persistMessage(message));
   }
 
-  /**
-   * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
-   * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
-   */
-  protected void addMessage(Message message) throws MessageMutationException
+  public void persistMessage(Message message)
   {
-    Message existingMessage = messages.get(message.getKey());
-
-    // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
-    // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
-    // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
-    // fängt dies bereits der ChatRoom ab.
-    // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
-    // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
-    // doppelt aufschlägt...
-    if (existingMessage == null)
-    {
-      messages.put(message.getKey(), message);
-    }
-    else
-    {
-      if (!existingMessage.getMessageText().equals(message.getMessageText()))
-      {
-        throw new MessageMutationException(existingMessage, message.getMessageText());
-      }
-
-      // Warn and emit existing message
-      log.warn(
-          "Keeping existing message with {}@{} for {}",
-          existingMessage.getSerialNumber(),
-          existingMessage.getTimestamp(),
-          existingMessage.getKey());
-    }
+    messages.put(message.getKey(), message)
   }
 
   @Override