WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomActiveMessageHandlingStrategy.java
index 8eac990..69b6fe9 100644 (file)
@@ -1,19 +1,70 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.UUID;
 
 
 @RequiredArgsConstructor
+@Log4j
 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 {
+  private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
+  private final UUID chatRoomId;
+  private final ZoneOffset zoneOffset;
+  private final KafkaChatRoomService chatRoomService;
+
+
+  @Override
+  public Mono<Message> persistMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, MessageTo> record =
+          new ProducerRecord<>(
+              tp.topic(),
+              tp.partition(),
+              timestamp.toEpochSecond(zoneOffset),
+              chatRoomId.toString(),
+              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          // On successful send
+          {
+            // Emit new message
+            message = new Message(key, metadata.offset(), timestamp, text);
+            messages.put(message.getKey(), message);
+          }
+
+          sink.success();
+        }
+        else
+        {
+          // On send-failure
+          sink.error(exception);
+        }
+      }));
+    });
+  }
 
   @Override
   public MessageHandlingStrategy handleMessage(Message message)
   {
-    chatrooms[tp.partition()].put()
-    return this;
   }
 }