WIP
authorKai Moritz <kai@juplo.de>
Sat, 18 Feb 2023 09:25:23 +0000 (10:25 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 11:14:59 +0000 (12:14 +0100)
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java

index 4db77ee..339451a 100644 (file)
@@ -70,7 +70,7 @@ public class ChatBackendController
             .flatMap(chatroom -> put(chatroom, username, messageId, text));
   }
 
-  public Mono<MessageTo> put(
+  private Mono<MessageTo> put(
       ChatRoom chatroom,
       String username,
       Long messageId,
index 8eac990..69b6fe9 100644 (file)
@@ -1,19 +1,70 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.UUID;
 
 
 @RequiredArgsConstructor
+@Log4j
 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 {
+  private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
+  private final UUID chatRoomId;
+  private final ZoneOffset zoneOffset;
+  private final KafkaChatRoomService chatRoomService;
+
+
+  @Override
+  public Mono<Message> persistMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, MessageTo> record =
+          new ProducerRecord<>(
+              tp.topic(),
+              tp.partition(),
+              timestamp.toEpochSecond(zoneOffset),
+              chatRoomId.toString(),
+              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          // On successful send
+          {
+            // Emit new message
+            message = new Message(key, metadata.offset(), timestamp, text);
+            messages.put(message.getKey(), message);
+          }
+
+          sink.success();
+        }
+        else
+        {
+          // On send-failure
+          sink.error(exception);
+        }
+      }));
+    });
+  }
 
   @Override
   public MessageHandlingStrategy handleMessage(Message message)
   {
-    chatrooms[tp.partition()].put()
-    return this;
   }
 }
index 79f2e63..4e7eb36 100644 (file)
@@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -18,7 +17,6 @@ import java.util.UUID;
 
 
 @Slf4j
-@RequiredArgsConstructor
 public class KafkaChatRoomService implements ChatRoomService
 {
   private final Producer<String, MessageTo> producer;
@@ -28,7 +26,21 @@ public class KafkaChatRoomService implements ChatRoomService
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private long offset = 0l;
+  private MessageHandlingStrategy strategy;
+
+
+  public KafkaChatRoomService(
+      Producer<String, MessageTo> producer,
+      TopicPartition tp,
+      UUID chatRoomId,
+      ZoneOffset zoneOffset)
+  {
+    this.producer = producer;
+    this.tp = tp;
+    this.chatRoomId = chatRoomId;
+    this.zoneOffset = zoneOffset;
+    this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
+  }
 
 
   @Override
@@ -37,65 +49,41 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, MessageTo> record =
-          new ProducerRecord<>(
-              tp.topic(),
-              tp.partition(),
-              timestamp.toEpochSecond(zoneOffset),
-              chatRoomId.toString(),
-              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+    return strategy.persistMessage(key, timestamp, text);
+  }
 
-      producer.send(record, ((metadata, exception) ->
+  synchronized protected void addMessage(Message message) throws MessageMutationException
+  {
+    Message existingMessage = messages.get(message.getKey());
+
+    if (existingMessage == null)
+    {
+      messages.put(existingMessage.getKey(), existingMessage);
+    }
+    else
+    {
+      if (!existingMessage.getMessageText().equals(message.getMessageText()))
       {
-        if (metadata != null)
-        {
-          // On successful send
-          Message message = messages.get(key);
-          if (message != null)
-          {
-            if (message.getMessageText().equals(text))
-            {
-              // Warn and emit existing message
-              log.warn(
-                  "Keeping existing message with {}@{} for {}",
-                  message.getSerialNumber(),
-                  message.getTimestamp(), key);
-            }
-            else
-            {
-              // Emit error and abort
-              sink.error(new MessageMutationException(message, text));
-              return;
-            }
-          }
-          else
-          {
-            // Emit new message
-            message = new Message(key, metadata.offset(), timestamp, text);
-            messages.put(message.getKey(), message);
-          }
+        throw new MessageMutationException(existingMessage, message.getMessageText());
+      }
 
-          sink.success();
-        }
-        else
-        {
-          // On send-failure
-          sink.error(exception);
-        }
-      }));
-    });
+      // Warn and emit existing message
+      log.warn(
+          "Keeping existing message with {}@{} for {}",
+          existingMessage.getSerialNumber(),
+          existingMessage.getTimestamp(),
+          existingMessage.getKey());
+    }
   }
 
   @Override
-  public Mono<Message> getMessage(Message.MessageKey key)
+  synchronized public Mono<Message> getMessage(Message.MessageKey key)
   {
     return Mono.fromSupplier(() -> messages.get(key));
   }
 
   @Override
-  public Flux<Message> getMessages(long first, long last)
+  synchronized public Flux<Message> getMessages(long first, long last)
   {
     return Flux.fromStream(messages
       .values()
index 194b4d0..097ad73 100644 (file)
@@ -1,9 +1,17 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
 
 
 interface MessageHandlingStrategy
 {
+  Mono<Message> persistMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text);
+
   MessageHandlingStrategy handleMessage(Message message);
 }