TMP:test:FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoomData.java
index 873e58e..20c046d 100644 (file)
@@ -1,5 +1,8 @@
 package de.juplo.kafka.chat.backend.domain;
 
+import de.juplo.kafka.chat.backend.domain.exceptions.ChatRoomInactiveException;
+import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException;
+import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -13,29 +16,29 @@ import java.util.regex.Pattern;
 
 
 @Slf4j
-public class ChatRoom
+public class ChatRoomData
 {
   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
 
+  private final ChatMessageService service;
   private final Clock clock;
-  private final ChatRoomService service;
-  private final int bufferSize;
+  private final int historyLimit;
   private Sinks.Many<Message> sink;
+  private volatile boolean active = false;
 
 
-  public ChatRoom(
+  public ChatRoomData(
       Clock clock,
-      ChatRoomService service,
-      int bufferSize)
+      ChatMessageService service,
+      int historyLimit)
   {
-    log.info("Created ChatRoom with buffer-size {}", bufferSize);
+    log.info("Created ChatRoom with history-limit {}", historyLimit);
     this.clock = clock;
     this.service = service;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     // @RequiredArgsConstructor unfortunately not possible, because
-    // the `bufferSize` is not set, if `createSink()` is called
+    // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
-    this.sink = createSink();
   }
 
 
@@ -62,8 +65,8 @@ public class ChatRoom
             sink.error(new MessageMutationException(existing, text));
           }
         })
-        .switchIfEmpty(
-            Mono
+        .switchIfEmpty(active
+            Mono
                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
                 .doOnNext(m ->
                 {
@@ -72,11 +75,12 @@ public class ChatRoom
                   {
                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
                   }
-                }));
+                })
+            : Mono.error(new ChatRoomInactiveException(service.getChatRoomId())));
   }
 
 
-  public ChatRoomService getChatRoomService()
+  public ChatMessageService getChatRoomService()
   {
     return service;
   }
@@ -89,9 +93,13 @@ public class ChatRoom
 
   synchronized public Flux<Message> listen()
   {
-    return sink
-        .asFlux()
-        .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
+    return active
+        ? sink
+            .asFlux()
+            .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel!
+        : Flux
+            .error(new ChatRoomInactiveException(service.getChatRoomId()));
+
   }
 
   public Flux<Message> getMessages()
@@ -104,11 +112,31 @@ public class ChatRoom
     return service.getMessages(first, last);
   }
 
+  public void activate()
+  {
+    if (active)
+    {
+      log.info("{} is already active!", service.getChatRoomId());
+      return;
+    }
+
+    log.info("{} is being activated", service.getChatRoomId());
+    this.sink = createSink();
+    active = true;
+  }
+
+  public void deactivate()
+  {
+    log.info("{} is being deactivated", service.getChatRoomId());
+    active = false;
+    sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+  }
+
   private Sinks.Many<Message> createSink()
   {
     return Sinks
         .many()
-        .multicast()
-        .onBackpressureBuffer(bufferSize);
+        .replay()
+        .limit(historyLimit);
   }
 }