NEU
authorKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 09:16:08 +0000 (11:16 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 09:16:08 +0000 (11:16 +0200)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java

index cffc0ad..b946309 100644 (file)
@@ -80,6 +80,11 @@ public class ChatRoom extends ChatRoomInfo
   }
 
 
+  public ChatRoomService getChatRoomService()
+  {
+    return service;
+  }
+
   public Mono<Message> getMessage(String username, Long messageId)
   {
     Message.MessageKey key = Message.MessageKey.of(username, messageId);
index 5133d1a..f105902 100644 (file)
@@ -137,11 +137,13 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
             UUID chatRoomId = UUID.fromString(record.key());
             MessageTo messageTo = record.value();
             ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
-            Mono<Message> result = chatRoom.addMessage(
-                messageTo.getId(),
-                messageTo.getUser(),
-                messageTo.getText());
-            result.block().
+            KafkaChatRoomService kafkaChatRoomService =
+                (KafkaChatRoomService) chatRoom.getChatRoomService();
+            Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
+            Instant instant = Instant.ofEpochSecond(record.timestamp());
+            LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+            Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
+            kafkaChatRoomService.persistMessage(message);
           }
         }
         else
index ed155df..3a8c2c6 100644 (file)
@@ -30,42 +30,14 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
-    return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text);
+    return kafkaChatHomeService
+        .sendMessage(chatRoomId, key, timestamp, text)
+        .doOnSuccess(message -> persistMessage(message));
   }
 
-  /**
-   * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
-   * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
-   */
-  protected void addMessage(Message message) throws MessageMutationException
+  public void persistMessage(Message message)
   {
-    Message existingMessage = messages.get(message.getKey());
-
-    // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
-    // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
-    // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
-    // fängt dies bereits der ChatRoom ab.
-    // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
-    // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
-    // doppelt aufschlägt...
-    if (existingMessage == null)
-    {
-      messages.put(message.getKey(), message);
-    }
-    else
-    {
-      if (!existingMessage.getMessageText().equals(message.getMessageText()))
-      {
-        throw new MessageMutationException(existingMessage, message.getMessageText());
-      }
-
-      // Warn and emit existing message
-      log.warn(
-          "Keeping existing message with {}@{} for {}",
-          existingMessage.getSerialNumber(),
-          existingMessage.getTimestamp(),
-          existingMessage.getKey());
-    }
+    messages.put(message.getKey(), message)
   }
 
   @Override