NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index 79f2e63..ed155df 100644 (file)
@@ -5,31 +5,24 @@ import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.LinkedHashMap;
 import java.util.UUID;
 
 
-@Slf4j
 @RequiredArgsConstructor
+@Slf4j
 public class KafkaChatRoomService implements ChatRoomService
 {
-  private final Producer<String, MessageTo> producer;
-  private final TopicPartition tp;
+  private final KafkaChatHomeService kafkaChatHomeService;
   private final UUID chatRoomId;
-  private final ZoneOffset zoneOffset;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private long offset = 0l;
-
 
   @Override
   public Mono<Message> persistMessage(
@@ -37,65 +30,54 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, MessageTo> record =
-          new ProducerRecord<>(
-              tp.topic(),
-              tp.partition(),
-              timestamp.toEpochSecond(zoneOffset),
-              chatRoomId.toString(),
-              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+    return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text);
+  }
 
-      producer.send(record, ((metadata, exception) ->
+  /**
+   * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
+   * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
+   */
+  protected void addMessage(Message message) throws MessageMutationException
+  {
+    Message existingMessage = messages.get(message.getKey());
+
+    // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
+    // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
+    // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
+    // fängt dies bereits der ChatRoom ab.
+    // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
+    // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
+    // doppelt aufschlägt...
+    if (existingMessage == null)
+    {
+      messages.put(message.getKey(), message);
+    }
+    else
+    {
+      if (!existingMessage.getMessageText().equals(message.getMessageText()))
       {
-        if (metadata != null)
-        {
-          // On successful send
-          Message message = messages.get(key);
-          if (message != null)
-          {
-            if (message.getMessageText().equals(text))
-            {
-              // Warn and emit existing message
-              log.warn(
-                  "Keeping existing message with {}@{} for {}",
-                  message.getSerialNumber(),
-                  message.getTimestamp(), key);
-            }
-            else
-            {
-              // Emit error and abort
-              sink.error(new MessageMutationException(message, text));
-              return;
-            }
-          }
-          else
-          {
-            // Emit new message
-            message = new Message(key, metadata.offset(), timestamp, text);
-            messages.put(message.getKey(), message);
-          }
+        throw new MessageMutationException(existingMessage, message.getMessageText());
+      }
 
-          sink.success();
-        }
-        else
-        {
-          // On send-failure
-          sink.error(exception);
-        }
-      }));
-    });
+      // Warn and emit existing message
+      log.warn(
+          "Keeping existing message with {}@{} for {}",
+          existingMessage.getSerialNumber(),
+          existingMessage.getTimestamp(),
+          existingMessage.getKey());
+    }
   }
 
   @Override
-  public Mono<Message> getMessage(Message.MessageKey key)
+  synchronized public Mono<Message> getMessage(Message.MessageKey key)
   {
+    // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
+    // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
     return Mono.fromSupplier(() -> messages.get(key));
   }
 
   @Override
-  public Flux<Message> getMessages(long first, long last)
+  synchronized public Flux<Message> getMessages(long first, long last)
   {
     return Flux.fromStream(messages
       .values()