TMP:test:FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoomData.java
index 18ca0bf..20c046d 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.domain;
 
+import de.juplo.kafka.chat.backend.domain.exceptions.ChatRoomInactiveException;
 import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException;
 import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
@@ -19,25 +20,25 @@ public class ChatRoomData
 {
   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
 
-  private final ChatRoomService service;
+  private final ChatMessageService service;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
   private Sinks.Many<Message> sink;
+  private volatile boolean active = false;
 
 
   public ChatRoomData(
       Clock clock,
-      ChatRoomService service,
-      int bufferSize)
+      ChatMessageService service,
+      int historyLimit)
   {
-    log.info("Created ChatRoom with buffer-size {}", bufferSize);
+    log.info("Created ChatRoom with history-limit {}", historyLimit);
     this.clock = clock;
     this.service = service;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     // @RequiredArgsConstructor unfortunately not possible, because
-    // the `bufferSize` is not set, if `createSink()` is called
+    // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
-    this.sink = createSink();
   }
 
 
@@ -64,8 +65,8 @@ public class ChatRoomData
             sink.error(new MessageMutationException(existing, text));
           }
         })
-        .switchIfEmpty(
-            Mono
+        .switchIfEmpty(active
+            Mono
                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
                 .doOnNext(m ->
                 {
@@ -74,11 +75,12 @@ public class ChatRoomData
                   {
                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
                   }
-                }));
+                })
+            : Mono.error(new ChatRoomInactiveException(service.getChatRoomId())));
   }
 
 
-  public ChatRoomService getChatRoomService()
+  public ChatMessageService getChatRoomService()
   {
     return service;
   }
@@ -91,9 +93,13 @@ public class ChatRoomData
 
   synchronized public Flux<Message> listen()
   {
-    return sink
-        .asFlux()
-        .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
+    return active
+        ? sink
+            .asFlux()
+            .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel!
+        : Flux
+            .error(new ChatRoomInactiveException(service.getChatRoomId()));
+
   }
 
   public Flux<Message> getMessages()
@@ -106,11 +112,31 @@ public class ChatRoomData
     return service.getMessages(first, last);
   }
 
+  public void activate()
+  {
+    if (active)
+    {
+      log.info("{} is already active!", service.getChatRoomId());
+      return;
+    }
+
+    log.info("{} is being activated", service.getChatRoomId());
+    this.sink = createSink();
+    active = true;
+  }
+
+  public void deactivate()
+  {
+    log.info("{} is being deactivated", service.getChatRoomId());
+    active = false;
+    sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+  }
+
   private Sinks.Many<Message> createSink()
   {
     return Sinks
         .many()
-        .multicast()
-        .onBackpressureBuffer(bufferSize);
+        .replay()
+        .limit(historyLimit);
   }
 }