TMP:test -- FIX: `ChatRoomData` active/inactive
authorKai Moritz <kai@juplo.de>
Mon, 11 Mar 2024 14:57:06 +0000 (15:57 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 16 Mar 2024 09:45:41 +0000 (10:45 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index bff56c1..ca33aca 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.domain;
 
+import de.juplo.kafka.chat.backend.domain.exceptions.ChatRoomInactiveException;
 import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException;
 import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
@@ -23,6 +24,7 @@ public class ChatRoomData
   private final Clock clock;
   private final int historyLimit;
   private Sinks.Many<Message> sink;
+  private volatile boolean active = true;
 
 
   public ChatRoomData(
@@ -37,7 +39,6 @@ public class ChatRoomData
     // @RequiredArgsConstructor unfortunately not possible, because
     // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
-    this.sink = createSink();
   }
 
 
@@ -64,8 +65,8 @@ public class ChatRoomData
             sink.error(new MessageMutationException(existing, text));
           }
         })
-        .switchIfEmpty(
-            Mono
+        .switchIfEmpty(active
+            Mono
                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
                 .doOnNext(m ->
                 {
@@ -74,7 +75,8 @@ public class ChatRoomData
                   {
                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
                   }
-                }));
+                })
+            : Mono.error(new ChatRoomInactiveException(service.getChatRoomId())));
   }
 
 
@@ -91,9 +93,13 @@ public class ChatRoomData
 
   synchronized public Flux<Message> listen()
   {
-    return sink
-        .asFlux()
-        .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
+    return active
+        ? sink
+            .asFlux()
+            .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel!
+        : Flux
+            .error(new ChatRoomInactiveException(service.getChatRoomId()));
+
   }
 
   public Flux<Message> getMessages()
@@ -106,9 +112,17 @@ public class ChatRoomData
     return service.getMessages(first, last);
   }
 
-  public void close()
+  public void activate()
+  {
+    log.info("{} is being activated", service.getChatRoomId());
+    this.sink = createSink();
+    active = true;
+  }
+
+  public void deactivate()
   {
-    log.info("{} is being closed", service.getChatRoomId());
+    log.info("{} is being deactivated", service.getChatRoomId());
+    active = false;
     sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
   }
 
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java
new file mode 100644 (file)
index 0000000..9580c96
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka.chat.backend.domain.exceptions;
+
+import lombok.Getter;
+
+import java.util.UUID;
+
+
+public class ChatRoomInactiveException extends IllegalStateException
+{
+  @Getter
+  private final UUID chatRoomId;
+
+
+  public ChatRoomInactiveException(UUID chatRoomId)
+  {
+    super("Chat-Room " + chatRoomId + " was closed.");
+    this.chatRoomId = chatRoomId;
+  }
+}
index 2aac0fa..8e3cc43 100644 (file)
@@ -76,12 +76,13 @@ public class SimpleChatHomeService implements ChatHomeService
               new InMemoryChatMessageService(chatRoomId);
 
           chatRoomInfo.put(chatRoomId, info);
-          chatRoomData.put(
-              info.getId(),
+          ChatRoomData chatRoomData =
               new ChatRoomData(
                   clock,
                   chatMessageService,
-                  historyLimit));
+                  historyLimit);
+          chatRoomData.activate();
+          this.chatRoomData.put(info.getId(), chatRoomData);
 
           return chatMessageService.restore(storageStrategy);
         })
@@ -100,6 +101,7 @@ public class SimpleChatHomeService implements ChatHomeService
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
+    chatRoomData.activate();
     this.chatRoomData.put(id, chatRoomData);
     return Mono.just(chatRoomInfo);
   }
index 8dcc1bc..32a5720 100644 (file)
@@ -144,6 +144,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       isShardOwned[partition] =  true;
       this.currentOffset[partition] = currentOffset;
 
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.activate());
+
       log.info(
           "Partition assigned: {} - loading messages: next={} -> current={}",
           partition,
@@ -176,12 +180,15 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     partitions.forEach(topicPartition ->
     {
       int partition = topicPartition.partition();
-      chatRoomData[partition]
-          .values()
-          .forEach(chatRoomData -> chatRoomData.close());
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
+
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.deactivate());
+
       channelMediator.shardRevoked(partition);
     });
   }
@@ -327,7 +334,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
 
   void createChatRoomData(ChatRoomInfo chatRoomInfo)
   {
-    computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard());
+    ChatRoomData chatRoomData = computeChatRoomData(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getShard());
+    chatRoomData.activate();
   }
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)