NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index 4e7eb36..ed155df 100644 (file)
@@ -3,45 +3,26 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.LinkedHashMap;
 import java.util.UUID;
 
 
+@RequiredArgsConstructor
 @Slf4j
 public class KafkaChatRoomService implements ChatRoomService
 {
-  private final Producer<String, MessageTo> producer;
-  private final TopicPartition tp;
+  private final KafkaChatHomeService kafkaChatHomeService;
   private final UUID chatRoomId;
-  private final ZoneOffset zoneOffset;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private MessageHandlingStrategy strategy;
-
-
-  public KafkaChatRoomService(
-      Producer<String, MessageTo> producer,
-      TopicPartition tp,
-      UUID chatRoomId,
-      ZoneOffset zoneOffset)
-  {
-    this.producer = producer;
-    this.tp = tp;
-    this.chatRoomId = chatRoomId;
-    this.zoneOffset = zoneOffset;
-    this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
-  }
-
 
   @Override
   public Mono<Message> persistMessage(
@@ -49,16 +30,27 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
-    return strategy.persistMessage(key, timestamp, text);
+    return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text);
   }
 
-  synchronized protected void addMessage(Message message) throws MessageMutationException
+  /**
+   * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
+   * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
+   */
+  protected void addMessage(Message message) throws MessageMutationException
   {
     Message existingMessage = messages.get(message.getKey());
 
+    // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
+    // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
+    // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
+    // fängt dies bereits der ChatRoom ab.
+    // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
+    // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
+    // doppelt aufschlägt...
     if (existingMessage == null)
     {
-      messages.put(existingMessage.getKey(), existingMessage);
+      messages.put(message.getKey(), message);
     }
     else
     {
@@ -79,6 +71,8 @@ public class KafkaChatRoomService implements ChatRoomService
   @Override
   synchronized public Mono<Message> getMessage(Message.MessageKey key)
   {
+    // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
+    // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
     return Mono.fromSupplier(() -> messages.get(key));
   }