From 11fb12dd3b76efc8c47066331a55392f8b1eb4d9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Mar 2024 13:08:21 +0100 Subject: [PATCH] fix: GREEN - Implemented activation/deactivation of `ChatRoomData` * Introduced `volatile ChatRoomData#active`, which initially is `false`. * `ChatRoomData#listen()` throws `ChatRoomInactiveException` if inactive. * `ChatRoomData#addMessage(..)` throws `ChatRoomInactiveException` if inactive. * `SimpleChatHomeService` explicitly activates restored and newly created instances of `ChatRoomData`. * `DataChannel` explicitly activates instances of `ChatRoomData`, if they are restored during partition-assignment or, if a new chat-room is created. * `DataChannel` explicitly _deactivates_ instances of `ChatRoomData`, if the associated partition is revoked. * Also: Introduced `ChatMessageService#getChatRoomId()`. --- .../backend/domain/ChatMessageService.java | 3 ++ .../chat/backend/domain/ChatRoomData.java | 32 +++++++++++++++---- .../exceptions/ChatRoomInactiveException.java | 19 +++++++++++ .../inmemory/InMemoryChatMessageService.java | 2 ++ .../inmemory/SimpleChatHomeService.java | 8 +++-- .../implementation/kafka/DataChannel.java | 29 ++++++++++++++++- .../kafka/KafkaChatMessageService.java | 5 ++- 7 files changed, 86 insertions(+), 12 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java index 640dc9e9..293a2409 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java @@ -4,10 +4,13 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; +import java.util.UUID; public interface ChatMessageService { + UUID getChatRoomId(); + Mono persistMessage( Message.MessageKey key, LocalDateTime timestamp, diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index 1edae4d7..20c046d8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.domain; +import de.juplo.kafka.chat.backend.domain.exceptions.ChatRoomInactiveException; import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException; import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException; import lombok.extern.slf4j.Slf4j; @@ -23,6 +24,7 @@ public class ChatRoomData private final Clock clock; private final int historyLimit; private Sinks.Many sink; + private volatile boolean active = false; public ChatRoomData( @@ -37,7 +39,6 @@ public class ChatRoomData // @RequiredArgsConstructor unfortunately not possible, because // the `historyLimit` is not set, if `createSink()` is called // from the variable declaration! - this.sink = createSink(); } @@ -64,8 +65,8 @@ public class ChatRoomData sink.error(new MessageMutationException(existing, text)); } }) - .switchIfEmpty( - Mono + .switchIfEmpty(active + ? Mono .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text)) .doOnNext(m -> { @@ -74,7 +75,8 @@ public class ChatRoomData { log.warn("Emitting of message failed with {} for {}", result.name(), m); } - })); + }) + : Mono.error(new ChatRoomInactiveException(service.getChatRoomId()))); } @@ -91,9 +93,13 @@ public class ChatRoomData synchronized public Flux listen() { - return sink - .asFlux() - .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel! + return active + ? sink + .asFlux() + .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel! + : Flux + .error(new ChatRoomInactiveException(service.getChatRoomId())); + } public Flux getMessages() @@ -108,10 +114,22 @@ public class ChatRoomData public void activate() { + if (active) + { + log.info("{} is already active!", service.getChatRoomId()); + return; + } + + log.info("{} is being activated", service.getChatRoomId()); + this.sink = createSink(); + active = true; } public void deactivate() { + log.info("{} is being deactivated", service.getChatRoomId()); + active = false; + sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); } private Sinks.Many createSink() diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java new file mode 100644 index 00000000..ca804e1c --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chat.backend.domain.exceptions; + +import lombok.Getter; + +import java.util.UUID; + + +public class ChatRoomInactiveException extends IllegalStateException +{ + @Getter + private final UUID chatRoomId; + + + public ChatRoomInactiveException(UUID chatRoomId) + { + super("Chat-Room " + chatRoomId + " is currently inactive."); + this.chatRoomId = chatRoomId; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java index 5d5feb87..a9a76a59 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.inmemory; import de.juplo.kafka.chat.backend.domain.ChatMessageService; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -15,6 +16,7 @@ import java.util.UUID; @Slf4j public class InMemoryChatMessageService implements ChatMessageService { + @Getter private final UUID chatRoomId; private final LinkedHashMap messages; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 2aac0fac..8e3cc430 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -76,12 +76,13 @@ public class SimpleChatHomeService implements ChatHomeService new InMemoryChatMessageService(chatRoomId); chatRoomInfo.put(chatRoomId, info); - chatRoomData.put( - info.getId(), + ChatRoomData chatRoomData = new ChatRoomData( clock, chatMessageService, - historyLimit)); + historyLimit); + chatRoomData.activate(); + this.chatRoomData.put(info.getId(), chatRoomData); return chatMessageService.restore(storageStrategy); }) @@ -100,6 +101,7 @@ public class SimpleChatHomeService implements ChatHomeService ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); this.chatRoomInfo.put(id, chatRoomInfo); ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit); + chatRoomData.activate(); this.chatRoomData.put(id, chatRoomData); return Mono.just(chatRoomInfo); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 63f36f53..abe51f4a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -178,7 +178,13 @@ public class DataChannel implements Channel, ConsumerRebalanceListener int partition = topicPartition.partition(); isShardOwned[partition] = false; nextOffset[partition] = consumer.position(topicPartition); + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + + chatRoomData[partition] + .values() + .forEach(chatRoomData -> chatRoomData.deactivate()); + channelMediator.shardRevoked(partition); }); } @@ -213,6 +219,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener { log.info("Loading of messages completed! Pausing all owned partitions..."); pauseAllOwnedPartions(); + activateAllOwnedChatRooms(); log.info("Resuming normal operations..."); channelState = ChannelState.READY; } @@ -313,6 +320,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener .toList()); } + private void activateAllOwnedChatRooms() + { + IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .forEach(shard -> chatRoomData[shard] + .values() + .forEach(chatRoomData -> chatRoomData.activate())); + } + int[] getOwnedShards() { @@ -324,7 +341,17 @@ public class DataChannel implements Channel, ConsumerRebalanceListener void createChatRoomData(ChatRoomInfo chatRoomInfo) { - computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard()); + int shard = chatRoomInfo.getShard(); + + ChatRoomData chatRoomData = computeChatRoomData( + chatRoomInfo.getId(), + chatRoomInfo.getShard()); + + // TODO: Possible race-condition in case of an ongoing rebalance! + if (isShardOwned[shard]) + { + chatRoomData.activate(); + } } Mono getChatRoomData(int shard, UUID id) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java index 8ab50f1f..f93a534a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java @@ -1,7 +1,9 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatMessageService; -import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -16,6 +18,7 @@ import java.util.UUID; public class KafkaChatMessageService implements ChatMessageService { private final DataChannel dataChannel; + @Getter private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); -- 2.20.1