WIP
authorKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 14:07:22 +0000 (15:07 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 14:35:29 +0000 (15:35 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java

index 69b6fe9..49ace7a 100644 (file)
@@ -18,6 +18,7 @@ import java.util.UUID;
 @Log4j
 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 {
+  private final KafkaChatRoomService kafkaChatRoomService;
   private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
   private final UUID chatRoomId;
@@ -48,8 +49,8 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
           // On successful send
           {
             // Emit new message
-            message = new Message(key, metadata.offset(), timestamp, text);
-            messages.put(message.getKey(), message);
+            Message message = new Message(key, metadata.offset(), timestamp, text);
+            kafkaChatRoomService.addMessage(message);
           }
 
           sink.success();
index 4e7eb36..91b5031 100644 (file)
@@ -5,7 +5,6 @@ import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -26,7 +25,7 @@ public class KafkaChatRoomService implements ChatRoomService
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private MessageHandlingStrategy strategy;
+  private volatile MessageHandlingStrategy strategy;
 
 
   public KafkaChatRoomService(
@@ -44,7 +43,7 @@ public class KafkaChatRoomService implements ChatRoomService
 
 
   @Override
-  public Mono<Message> persistMessage(
+  synchronized public Mono<Message> persistMessage(
     Message.MessageKey key,
     LocalDateTime timestamp,
     String text)
@@ -52,13 +51,20 @@ public class KafkaChatRoomService implements ChatRoomService
     return strategy.persistMessage(key, timestamp, text);
   }
 
-  synchronized protected void addMessage(Message message) throws MessageMutationException
+  protected void addMessage(Message message) throws MessageMutationException
   {
     Message existingMessage = messages.get(message.getKey());
 
+    // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
+    // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
+    // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
+    // fängt dies bereits der ChatRoom ab.
+    // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
+    // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
+    // doppelt aufschlägt...
     if (existingMessage == null)
     {
-      messages.put(existingMessage.getKey(), existingMessage);
+      messages.put(message.getKey(), message);
     }
     else
     {
@@ -79,6 +85,8 @@ public class KafkaChatRoomService implements ChatRoomService
   @Override
   synchronized public Mono<Message> getMessage(Message.MessageKey key)
   {
+    // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
+    // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
     return Mono.fromSupplier(() -> messages.get(key));
   }