From: Kai Moritz Date: Sun, 24 Mar 2024 19:34:07 +0000 (+0100) Subject: fix: GREEN - Postponed the resetting of the sink X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=eca4429d77ebcb8c2413136105a889a54f39f9b1;p=demos%2Fkafka%2Fchat fix: GREEN - Postponed the resetting of the sink * The completion of a sink of an instance of `ChatRoomData`, that belongs to a revoked partition is postponed until the following partition- assignment happens. * Also, the sink is only completed and recreated, if the partition it belongs to, was not assigned to the instance during `onPartitionsAssigned()` again. * That way, listeners have a little more time to receive the messages, that were send shortly prior to the deactivation, and are not disturbed unnecessarily, if the partition is reassigned to the same instance again. * *TODO:* Listeners still have to be enabled, to start listening on the responsible instance where they left of on the old instance, if they were to slow to receive all messages, before the sind was completed. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index c88e1c30..a1d5f57c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -23,8 +23,9 @@ public class ChatRoomData private final ChatMessageService service; private final Clock clock; private final int historyLimit; + private Sinks.Many sink; - private volatile boolean active = false; + private volatile boolean active; public ChatRoomData( @@ -39,6 +40,8 @@ public class ChatRoomData // @RequiredArgsConstructor unfortunately not possible, because // the `historyLimit` is not set, if `createSink()` is called // from the variable declaration! + this.sink = createSink(); + this.active = false; } @@ -116,20 +119,37 @@ public class ChatRoomData { if (active) { - log.info("{} is already active!", service.getChatRoomInfo()); + log.error("{} is already active!", service.getChatRoomInfo()); return; } log.info("{} is being activated", service.getChatRoomInfo()); - this.sink = createSink(); active = true; } public void deactivate() { + if (!active) + { + log.error("{} is already inactive!", service.getChatRoomInfo()); + return; + } + log.info("{} is being deactivated", service.getChatRoomInfo()); active = false; + } + + public void reset() + { + ChatRoomInfo chatRoomInfo = service.getChatRoomInfo(); + if (active) + { + throw new IllegalStateException("Could not reset active ChatRoomData: " + chatRoomInfo); + } + + log.info("Resetting {}", chatRoomInfo); sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); + sink = createSink(); } private Sinks.Many createSink() diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 327e9e6b..57c92371 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -15,14 +15,12 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import java.time.*; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.stream.IntStream; @@ -45,6 +43,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener private final Map[] chatRoomData; private final ChannelMediator channelMediator; private final ShardingPublisherStrategy shardingPublisherStrategy; + private final List previouslyOwnedChatRoomData = new LinkedList<>(); private boolean running; @Getter @@ -114,14 +113,14 @@ public class DataChannel implements Channel, ConsumerRebalanceListener { // On successful send Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); + log.info("Successfully sent message {} to chat-room {}", message, chatRoomId); sink.success(message); } else { // On send-failure log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + "Could not sent message to chat-room={}, key={}, timestamp={}, text={}: {}", chatRoomId, key, timestamp, @@ -169,6 +168,18 @@ public class DataChannel implements Channel, ConsumerRebalanceListener }); consumer.resume(partitions); + + Flux + .fromIterable(previouslyOwnedChatRoomData) + .filter(chatRoomData -> !isShardOwned[chatRoomData.getChatRoomService().getChatRoomInfo().getShard()]) + .doOnNext(chatRoomData -> chatRoomData.reset()) + .then() + .doOnSuccess(nothing -> + { + previouslyOwnedChatRoomData.clear(); + log.info("Done resetting revoked ChatRoomData"); + }) + .block(); } @Override @@ -184,7 +195,11 @@ public class DataChannel implements Channel, ConsumerRebalanceListener chatRoomData[partition] .values() - .forEach(chatRoomData -> chatRoomData.deactivate()); + .forEach(chatRoomData -> + { + chatRoomData.deactivate(); + previouslyOwnedChatRoomData.add(chatRoomData); + }); channelMediator.shardRevoked(partition); }); @@ -388,7 +403,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener } else { - log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit); + log.info("Creating ChatRoomData {} with history-limit {}", chatRoomInfo, historyLimit); KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomInfo); chatRoomData = new ChatRoomData(clock, service, historyLimit); this.chatRoomData[shard].put(chatRoomId, chatRoomData); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java index 57b8e59c..8da6d5e2 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java @@ -369,8 +369,8 @@ public class ChatRoomDataTest } @Test - @DisplayName("Assert, that a listended to chat-room emits completed, if it is deactivated") - void testListenedToChatRoomEmitsCompletedIfItIsDeactivated() + @DisplayName("Assert, that a listened to chat-room emits completed, if it is deactivated and reset") + void testListenedToChatRoomEmitsCompletedIfItIsDeactivatedAndActivatedWithReset() { // Given Message message1 = new Message(key, 1l, timestamp, "#1"); @@ -393,6 +393,7 @@ public class ChatRoomDataTest // When Flux listenFlux = chatRoomData.listen(); chatRoomData.deactivate(); + chatRoomData.reset(); // Then Awaitility