WIP:TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoomData.java
index bff56c1..20c046d 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.domain;
 
+import de.juplo.kafka.chat.backend.domain.exceptions.ChatRoomInactiveException;
 import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException;
 import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
@@ -23,6 +24,7 @@ public class ChatRoomData
   private final Clock clock;
   private final int historyLimit;
   private Sinks.Many<Message> sink;
+  private volatile boolean active = false;
 
 
   public ChatRoomData(
@@ -37,7 +39,6 @@ public class ChatRoomData
     // @RequiredArgsConstructor unfortunately not possible, because
     // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
-    this.sink = createSink();
   }
 
 
@@ -64,8 +65,8 @@ public class ChatRoomData
             sink.error(new MessageMutationException(existing, text));
           }
         })
-        .switchIfEmpty(
-            Mono
+        .switchIfEmpty(active
+            Mono
                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
                 .doOnNext(m ->
                 {
@@ -74,7 +75,8 @@ public class ChatRoomData
                   {
                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
                   }
-                }));
+                })
+            : Mono.error(new ChatRoomInactiveException(service.getChatRoomId())));
   }
 
 
@@ -91,9 +93,13 @@ public class ChatRoomData
 
   synchronized public Flux<Message> listen()
   {
-    return sink
-        .asFlux()
-        .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
+    return active
+        ? sink
+            .asFlux()
+            .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel!
+        : Flux
+            .error(new ChatRoomInactiveException(service.getChatRoomId()));
+
   }
 
   public Flux<Message> getMessages()
@@ -106,9 +112,23 @@ public class ChatRoomData
     return service.getMessages(first, last);
   }
 
-  public void close()
+  public void activate()
+  {
+    if (active)
+    {
+      log.info("{} is already active!", service.getChatRoomId());
+      return;
+    }
+
+    log.info("{} is being activated", service.getChatRoomId());
+    this.sink = createSink();
+    active = true;
+  }
+
+  public void deactivate()
   {
-    log.info("{} is being closed", service.getChatRoomId());
+    log.info("{} is being deactivated", service.getChatRoomId());
+    active = false;
     sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
   }