refactore: Renamed `PersistenceStrategy` to `ChatroomService` -- Rename
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
index 62c9732..2261e02 100644 (file)
@@ -1,8 +1,6 @@
 package de.juplo.kafka.chat.backend.domain;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -10,10 +8,8 @@ import reactor.core.publisher.Sinks;
 
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.stream.Stream;
 
 
-@RequiredArgsConstructor
 @Slf4j
 public class Chatroom
 {
@@ -21,70 +17,71 @@ public class Chatroom
   private final UUID id;
   @Getter
   private final String name;
-  private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
-  private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
+  private final ChatroomService chatroomService;
+  private final int bufferSize;
+  private Sinks.Many<Message> sink;
 
-  synchronized public Mono<Message> addMessage(
-      Long id,
-      LocalDateTime timestamp,
-      String user,
-      String text)
+  public Chatroom(
+      UUID id,
+      String name,
+      ChatroomService chatroomService,
+      int bufferSize)
   {
-    return persistMessage(id, timestamp, user, text)
-        .doOnNext(message -> sink.tryEmitNext(message).orThrow());
+    this.id = id;
+    this.name = name;
+    this.chatroomService = chatroomService;
+    this.bufferSize = bufferSize;
+    this.sink = createSink();
   }
 
-  private Mono<Message> persistMessage(
+
+  synchronized public Mono<Message> addMessage(
       Long id,
       LocalDateTime timestamp,
       String user,
       String text)
   {
-    Message message = new Message(id, (long)messages.size(), timestamp, user, text);
-
-    MessageKey key = new MessageKey(user, id);
-    Message existing = messages.get(key);
-    if (existing != null)
-    {
-      log.info("Message with key {} already exists; {}", key, existing);
-      if (!message.equals(existing))
-        throw new MessageMutationException(message, existing);
-      return Mono.empty();
-    }
-
-    messages.put(key, message);
-    return Mono
-        .fromSupplier(() -> message)
-        .log();
+    return chatroomService
+        .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
+        .doOnNext(message ->
+        {
+          Sinks.EmitResult result = sink.tryEmitNext(message);
+          if (result.isFailure())
+          {
+            log.warn("Emitting of message failed with {} for {}", result.name(), message);
+          }
+        });
   }
 
+
   public Mono<Message> getMessage(String username, Long messageId)
   {
-    return Mono.fromSupplier(() ->
-    {
-      MessageKey key = MessageKey.of(username, messageId);
-      return messages.get(key);
-    });
+    Message.MessageKey key = Message.MessageKey.of(username, messageId);
+    return chatroomService.getMessage(key);
   }
 
-  public Flux<Message> listen()
+  synchronized public Flux<Message> listen()
   {
-    return sink.asFlux();
+    return sink
+        .asFlux()
+        .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
   }
 
-  public Stream<Message> getMessages(long firstMessage)
+  public Flux<Message> getMessages()
   {
-    return messages
-        .values()
-        .stream()
-        .filter(message -> message.getSerialNumber() >= firstMessage);
+    return getMessages(0, Long.MAX_VALUE);
   }
 
+  public Flux<Message> getMessages(long first, long last)
+  {
+    return chatroomService.getMessages(first, last);
+  }
 
-  @Value(staticConstructor = "of")
-  static class MessageKey
+  private Sinks.Many<Message> createSink()
   {
-    String username;
-    Long messageId;
+    return Sinks
+        .many()
+        .multicast()
+        .onBackpressureBuffer(bufferSize);
   }
 }