NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index f036efe..3a8c2c6 100644 (file)
@@ -3,106 +3,41 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
+import java.util.UUID;
 
 
+@RequiredArgsConstructor
 @Slf4j
 public class KafkaChatRoomService implements ChatRoomService
 {
-  private final Producer<String, MessageTo> producer;
-  private final TopicPartition tp;
+  private final KafkaChatHomeService kafkaChatHomeService;
+  private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
 
-  public KafkaChatRoomService(
-      Producer<String, MessageTo> producer,
-      TopicPartition tp)
-  {
-    this.producer = producer;
-    this.tp = tp;
-  }
-
-
   @Override
-  synchronized public Mono<Message> persistMessage(
+  public Mono<Message> persistMessage(
     Message.MessageKey key,
     LocalDateTime timestamp,
     String text)
   {
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, MessageTo> record =
-          new ProducerRecord<>(
-              tp.topic(),
-              tp.partition(),
-              timestamp.toEpochSecond(zoneOffset),
-              chatRoomId.toString(),
-              MessageTo.of(key.getUsername(), key.getMessageId(), text));
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          // On successful send
-          {
-            // Emit new message
-            Message message = new Message(key, metadata.offset(), timestamp, text);
-            kafkaChatRoomService.addMessage(message);
-          }
-
-          sink.success();
-        }
-        else
-        {
-          // On send-failure
-          sink.error(exception);
-        }
-      }));
-    });
+    return kafkaChatHomeService
+        .sendMessage(chatRoomId, key, timestamp, text)
+        .doOnSuccess(message -> persistMessage(message));
   }
 
-  /**
-   * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
-   * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
-   */
-  protected void addMessage(Message message) throws MessageMutationException
+  public void persistMessage(Message message)
   {
-    Message existingMessage = messages.get(message.getKey());
-
-    // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
-    // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
-    // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
-    // fängt dies bereits der ChatRoom ab.
-    // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
-    // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
-    // doppelt aufschlägt...
-    if (existingMessage == null)
-    {
-      messages.put(message.getKey(), message);
-    }
-    else
-    {
-      if (!existingMessage.getMessageText().equals(message.getMessageText()))
-      {
-        throw new MessageMutationException(existingMessage, message.getMessageText());
-      }
-
-      // Warn and emit existing message
-      log.warn(
-          "Keeping existing message with {}@{} for {}",
-          existingMessage.getSerialNumber(),
-          existingMessage.getTimestamp(),
-          existingMessage.getKey());
-    }
+    messages.put(message.getKey(), message)
   }
 
   @Override