fix: GREEN - Implemented activation/deactivation of `ChatRoomData`
authorKai Moritz <kai@juplo.de>
Mon, 11 Mar 2024 12:08:21 +0000 (13:08 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 22 Mar 2024 16:39:20 +0000 (17:39 +0100)
* Introduced `volatile ChatRoomData#active`, which initially is `false`.
* `ChatRoomData#listen()` throws `ChatRoomInactiveException` if inactive.
* `ChatRoomData#addMessage(..)` throws `ChatRoomInactiveException` if
  inactive.
* `SimpleChatHomeService` explicitly activates restored and newly created
  instances of `ChatRoomData`.
* `DataChannel` explicitly activates instances of `ChatRoomData`, if
  they are restored during partition-assignment or, if a new chat-room
  is created.
* `DataChannel` explicitly _deactivates_ instances of `ChatRoomData`,
  if the associated partition is revoked.
* Also: Introduced `ChatMessageService#getChatRoomId()`.

src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java

index 640dc9e..293a240 100644 (file)
@@ -4,10 +4,13 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
+import java.util.UUID;
 
 
 public interface ChatMessageService
 {
+  UUID getChatRoomId();
+
   Mono<Message> persistMessage(
       Message.MessageKey key,
       LocalDateTime timestamp,
index 1edae4d..20c046d 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.domain;
 
+import de.juplo.kafka.chat.backend.domain.exceptions.ChatRoomInactiveException;
 import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException;
 import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
@@ -23,6 +24,7 @@ public class ChatRoomData
   private final Clock clock;
   private final int historyLimit;
   private Sinks.Many<Message> sink;
+  private volatile boolean active = false;
 
 
   public ChatRoomData(
@@ -37,7 +39,6 @@ public class ChatRoomData
     // @RequiredArgsConstructor unfortunately not possible, because
     // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
-    this.sink = createSink();
   }
 
 
@@ -64,8 +65,8 @@ public class ChatRoomData
             sink.error(new MessageMutationException(existing, text));
           }
         })
-        .switchIfEmpty(
-            Mono
+        .switchIfEmpty(active
+            Mono
                 .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
                 .doOnNext(m ->
                 {
@@ -74,7 +75,8 @@ public class ChatRoomData
                   {
                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
                   }
-                }));
+                })
+            : Mono.error(new ChatRoomInactiveException(service.getChatRoomId())));
   }
 
 
@@ -91,9 +93,13 @@ public class ChatRoomData
 
   synchronized public Flux<Message> listen()
   {
-    return sink
-        .asFlux()
-        .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
+    return active
+        ? sink
+            .asFlux()
+            .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel!
+        : Flux
+            .error(new ChatRoomInactiveException(service.getChatRoomId()));
+
   }
 
   public Flux<Message> getMessages()
@@ -108,10 +114,22 @@ public class ChatRoomData
 
   public void activate()
   {
+    if (active)
+    {
+      log.info("{} is already active!", service.getChatRoomId());
+      return;
+    }
+
+    log.info("{} is being activated", service.getChatRoomId());
+    this.sink = createSink();
+    active = true;
   }
 
   public void deactivate()
   {
+    log.info("{} is being deactivated", service.getChatRoomId());
+    active = false;
+    sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
   }
 
   private Sinks.Many<Message> createSink()
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java
new file mode 100644 (file)
index 0000000..ca804e1
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka.chat.backend.domain.exceptions;
+
+import lombok.Getter;
+
+import java.util.UUID;
+
+
+public class ChatRoomInactiveException extends IllegalStateException
+{
+  @Getter
+  private final UUID chatRoomId;
+
+
+  public ChatRoomInactiveException(UUID chatRoomId)
+  {
+    super("Chat-Room " + chatRoomId + " is currently inactive.");
+    this.chatRoomId = chatRoomId;
+  }
+}
index 5d5feb8..a9a76a5 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.inmemory;
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -15,6 +16,7 @@ import java.util.UUID;
 @Slf4j
 public class InMemoryChatMessageService implements ChatMessageService
 {
+  @Getter
   private final UUID chatRoomId;
   private final LinkedHashMap<Message.MessageKey, Message> messages;
 
index 2aac0fa..8e3cc43 100644 (file)
@@ -76,12 +76,13 @@ public class SimpleChatHomeService implements ChatHomeService
               new InMemoryChatMessageService(chatRoomId);
 
           chatRoomInfo.put(chatRoomId, info);
-          chatRoomData.put(
-              info.getId(),
+          ChatRoomData chatRoomData =
               new ChatRoomData(
                   clock,
                   chatMessageService,
-                  historyLimit));
+                  historyLimit);
+          chatRoomData.activate();
+          this.chatRoomData.put(info.getId(), chatRoomData);
 
           return chatMessageService.restore(storageStrategy);
         })
@@ -100,6 +101,7 @@ public class SimpleChatHomeService implements ChatHomeService
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
+    chatRoomData.activate();
     this.chatRoomData.put(id, chatRoomData);
     return Mono.just(chatRoomInfo);
   }
index 63f36f5..abe51f4 100644 (file)
@@ -178,7 +178,13 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
+
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.deactivate());
+
       channelMediator.shardRevoked(partition);
     });
   }
@@ -213,6 +219,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
             {
               log.info("Loading of messages completed! Pausing all owned partitions...");
               pauseAllOwnedPartions();
+              activateAllOwnedChatRooms();
               log.info("Resuming normal operations...");
               channelState = ChannelState.READY;
             }
@@ -313,6 +320,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
         .toList());
   }
 
+  private void activateAllOwnedChatRooms()
+  {
+    IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .forEach(shard -> chatRoomData[shard]
+            .values()
+            .forEach(chatRoomData -> chatRoomData.activate()));
+  }
+
 
   int[] getOwnedShards()
   {
@@ -324,7 +341,17 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
 
   void createChatRoomData(ChatRoomInfo chatRoomInfo)
   {
-    computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard());
+    int shard = chatRoomInfo.getShard();
+
+    ChatRoomData chatRoomData = computeChatRoomData(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getShard());
+
+    // TODO: Possible race-condition in case of an ongoing rebalance!
+    if (isShardOwned[shard])
+    {
+      chatRoomData.activate();
+    }
   }
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
index 8ab50f1..f93a534 100644 (file)
@@ -1,7 +1,9 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
-import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -16,6 +18,7 @@ import java.util.UUID;
 public class KafkaChatMessageService implements ChatMessageService
 {
   private final DataChannel dataChannel;
+  @Getter
   private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();