]> juplo.de Git - demos/kafka/chat/commitdiff
fix: GREEN - Postponed the resetting of the sink
authorKai Moritz <kai@juplo.de>
Sun, 24 Mar 2024 19:34:07 +0000 (20:34 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 25 Mar 2024 10:54:24 +0000 (11:54 +0100)
* The completion of a sink of an instance of `ChatRoomData`, that belongs
  to a revoked partition is postponed until the following partition-
  assignment happens.
* Also, the sink is only completed and recreated, if the partition it
  belongs to, was not assigned to the instance during
  `onPartitionsAssigned()` again.
* That way, listeners have a little more time to receive the messages, that
  were send shortly prior to the deactivation, and are not disturbed
  unnecessarily, if the partition is reassigned to the same instance again.
* *TODO:* Listeners still have to be enabled, to start listening on the
  responsible instance where they left of on the old instance, if they
  were to slow to receive all messages, before the sind was completed.

src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java

index c88e1c30ab307b74999af33274e7deb80f3a5d9f..a1d5f57c3a48134e144f4f91491a0f816c485d55 100644 (file)
@@ -23,8 +23,9 @@ public class ChatRoomData
   private final ChatMessageService service;
   private final Clock clock;
   private final int historyLimit;
+
   private Sinks.Many<Message> sink;
-  private volatile boolean active = false;
+  private volatile boolean active;
 
 
   public ChatRoomData(
@@ -39,6 +40,8 @@ public class ChatRoomData
     // @RequiredArgsConstructor unfortunately not possible, because
     // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
+    this.sink = createSink();
+    this.active = false;
   }
 
 
@@ -116,20 +119,37 @@ public class ChatRoomData
   {
     if (active)
     {
-      log.info("{} is already active!", service.getChatRoomInfo());
+      log.error("{} is already active!", service.getChatRoomInfo());
       return;
     }
 
     log.info("{} is being activated", service.getChatRoomInfo());
-    this.sink = createSink();
     active = true;
   }
 
   public void deactivate()
   {
+    if (!active)
+    {
+      log.error("{} is already inactive!", service.getChatRoomInfo());
+      return;
+    }
+
     log.info("{} is being deactivated", service.getChatRoomInfo());
     active = false;
+  }
+
+  public void reset()
+  {
+    ChatRoomInfo chatRoomInfo = service.getChatRoomInfo();
+    if (active)
+    {
+      throw new IllegalStateException("Could not reset active ChatRoomData: " + chatRoomInfo);
+    }
+
+    log.info("Resetting {}", chatRoomInfo);
     sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+    sink = createSink();
   }
 
   private Sinks.Many<Message> createSink()
index 327e9e6b235cd87e98c7637f6324085eb09b2438..57c9237122fd39da2cf1a863c7f0d6b982c187a9 100644 (file)
@@ -15,14 +15,12 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
 import java.time.*;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.IntStream;
 
 
@@ -45,6 +43,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
   private final Map<UUID, ChatRoomData>[] chatRoomData;
   private final ChannelMediator channelMediator;
   private final ShardingPublisherStrategy shardingPublisherStrategy;
+  private final List<ChatRoomData> previouslyOwnedChatRoomData = new LinkedList<>();
 
   private boolean running;
   @Getter
@@ -114,14 +113,14 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
         {
           // On successful send
           Message message = new Message(key, metadata.offset(), timestamp, text);
-          log.info("Successfully send message {}", message);
+          log.info("Successfully sent message {} to chat-room {}", message, chatRoomId);
           sink.success(message);
         }
         else
         {
           // On send-failure
           log.error(
-              "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
+              "Could not sent message to chat-room={}, key={}, timestamp={}, text={}: {}",
               chatRoomId,
               key,
               timestamp,
@@ -169,6 +168,18 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     });
 
     consumer.resume(partitions);
+
+    Flux
+        .fromIterable(previouslyOwnedChatRoomData)
+        .filter(chatRoomData -> !isShardOwned[chatRoomData.getChatRoomService().getChatRoomInfo().getShard()])
+        .doOnNext(chatRoomData -> chatRoomData.reset())
+        .then()
+        .doOnSuccess(nothing ->
+        {
+          previouslyOwnedChatRoomData.clear();
+          log.info("Done resetting revoked ChatRoomData");
+        })
+        .block();
   }
 
   @Override
@@ -184,7 +195,11 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
 
       chatRoomData[partition]
           .values()
-          .forEach(chatRoomData -> chatRoomData.deactivate());
+          .forEach(chatRoomData ->
+          {
+            chatRoomData.deactivate();
+            previouslyOwnedChatRoomData.add(chatRoomData);
+          });
 
       channelMediator.shardRevoked(partition);
     });
@@ -388,7 +403,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     }
     else
     {
-      log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
+      log.info("Creating ChatRoomData {} with history-limit {}", chatRoomInfo, historyLimit);
       KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomInfo);
       chatRoomData = new ChatRoomData(clock, service, historyLimit);
       this.chatRoomData[shard].put(chatRoomId, chatRoomData);
index 57b8e59c96864df6092a302c0a1ad0f14618b920..8da6d5e22c34130bba8d3234253d72f1c47ff295 100644 (file)
@@ -369,8 +369,8 @@ public class ChatRoomDataTest
   }
 
   @Test
-  @DisplayName("Assert, that a listended to chat-room emits completed, if it is deactivated")
-  void testListenedToChatRoomEmitsCompletedIfItIsDeactivated()
+  @DisplayName("Assert, that a listened to chat-room emits completed, if it is deactivated and reset")
+  void testListenedToChatRoomEmitsCompletedIfItIsDeactivatedAndActivatedWithReset()
   {
     // Given
     Message message1 = new Message(key, 1l, timestamp, "#1");
@@ -393,6 +393,7 @@ public class ChatRoomDataTest
     // When
     Flux<Message> listenFlux = chatRoomData.listen();
     chatRoomData.deactivate();
+    chatRoomData.reset();
 
     // Then
     Awaitility