WIP
authorKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 21:56:20 +0000 (22:56 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 11:14:59 +0000 (12:14 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java

index 981c11f..1175d55 100644 (file)
@@ -6,12 +6,17 @@ import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.LinkedHashMap;
+import java.util.UUID;
+import java.util.concurrent.Future;
 
 
 @Slf4j
@@ -20,6 +25,8 @@ public class KafkaChatRoomService implements ChatRoomService
 {
   private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
+  private final UUID chatRoomId;
+  private final ZoneOffset zoneOffset;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
@@ -27,16 +34,54 @@ public class KafkaChatRoomService implements ChatRoomService
 
 
   @Override
-  public Message persistMessage(
+  public Mono<Message> persistMessage(
     Message.MessageKey key,
     LocalDateTime timestamp,
     String text)
   {
-    
-    Mono.error(() -> new MessageMutationException(existing, text)));
-    Message message = new Message(key, (long)messages.size(), timestamp, text);
-    messages.put(message.getKey(), message);
-    return message;
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, MessageTo> record =
+          new ProducerRecord<>(
+              tp.topic(),
+              tp.partition(),
+              timestamp.toEpochSecond(zoneOffset),
+              chatRoomId.toString(),
+              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          Message message = messages.get(key);
+          if (message != null)
+          {
+            if (message.getMessageText().equals(text))
+            {
+              // Warn and emit existing message
+              log.warn(
+                  "Keeping existing message with {}@{} for {}",
+                  message.getSerialNumber(),
+                  message.getTimestamp(), key);
+            }
+            else
+            {
+              // Emit error and abort
+              sink.error(new MessageMutationException(message, text));
+              return;
+            }
+          }
+          else
+          {
+            // Emit new message
+            message = new Message(key, metadata.offset(), timestamp, text);
+            messages.put(message.getKey(), message);
+          }
+
+          sink.success();
+        }
+      }));
+    });
   }
 
   @Override
index 2de8ad5..0a867f1 100644 (file)
@@ -10,11 +10,11 @@ import java.time.LocalDateTime;
 
 @Data
 @NoArgsConstructor
-@AllArgsConstructor
+@AllArgsConstructor(staticName = "of")
 public class MessageTo
 {
-  private Long id;
   private String user;
+  private Long id;
   private String text;
 
   public Message toMessage(long offset, LocalDateTime timestamp)
@@ -26,8 +26,8 @@ public class MessageTo
   {
     return
         new MessageTo(
-            message.getId(),
             message.getUsername(),
+            message.getId(),
             message.getMessageText());
   }
 }