NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index 4e7eb36..f036efe 100644 (file)
@@ -11,9 +11,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.LinkedHashMap;
-import java.util.UUID;
 
 
 @Slf4j
@@ -21,44 +19,75 @@ public class KafkaChatRoomService implements ChatRoomService
 {
   private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
-  private final UUID chatRoomId;
-  private final ZoneOffset zoneOffset;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private MessageHandlingStrategy strategy;
-
 
   public KafkaChatRoomService(
       Producer<String, MessageTo> producer,
-      TopicPartition tp,
-      UUID chatRoomId,
-      ZoneOffset zoneOffset)
+      TopicPartition tp)
   {
     this.producer = producer;
     this.tp = tp;
-    this.chatRoomId = chatRoomId;
-    this.zoneOffset = zoneOffset;
-    this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
   }
 
 
   @Override
-  public Mono<Message> persistMessage(
+  synchronized public Mono<Message> persistMessage(
     Message.MessageKey key,
     LocalDateTime timestamp,
     String text)
   {
-    return strategy.persistMessage(key, timestamp, text);
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, MessageTo> record =
+          new ProducerRecord<>(
+              tp.topic(),
+              tp.partition(),
+              timestamp.toEpochSecond(zoneOffset),
+              chatRoomId.toString(),
+              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          // On successful send
+          {
+            // Emit new message
+            Message message = new Message(key, metadata.offset(), timestamp, text);
+            kafkaChatRoomService.addMessage(message);
+          }
+
+          sink.success();
+        }
+        else
+        {
+          // On send-failure
+          sink.error(exception);
+        }
+      }));
+    });
   }
 
-  synchronized protected void addMessage(Message message) throws MessageMutationException
+  /**
+   * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
+   * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
+   */
+  protected void addMessage(Message message) throws MessageMutationException
   {
     Message existingMessage = messages.get(message.getKey());
 
+    // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
+    // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
+    // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
+    // fängt dies bereits der ChatRoom ab.
+    // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
+    // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
+    // doppelt aufschlägt...
     if (existingMessage == null)
     {
-      messages.put(existingMessage.getKey(), existingMessage);
+      messages.put(message.getKey(), message);
     }
     else
     {
@@ -79,6 +108,8 @@ public class KafkaChatRoomService implements ChatRoomService
   @Override
   synchronized public Mono<Message> getMessage(Message.MessageKey key)
   {
+    // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
+    // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
     return Mono.fromSupplier(() -> messages.get(key));
   }