From 900422dccb5a92fbceac34caa5e614b0d7f05ad7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Mar 2024 15:57:06 +0100 Subject: [PATCH] TMP:test -- FIX: `ChatRoomData` active/inactive --- .../chat/backend/domain/ChatRoomData.java | 32 +++++++++++++------ .../exceptions/ChatRoomInactiveException.java | 19 +++++++++++ .../inmemory/SimpleChatHomeService.java | 8 +++-- .../implementation/kafka/DataChannel.java | 18 ++++++++--- 4 files changed, 61 insertions(+), 16 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index bff56c1e..ca33aca5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.domain; +import de.juplo.kafka.chat.backend.domain.exceptions.ChatRoomInactiveException; import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException; import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException; import lombok.extern.slf4j.Slf4j; @@ -23,6 +24,7 @@ public class ChatRoomData private final Clock clock; private final int historyLimit; private Sinks.Many sink; + private volatile boolean active = true; public ChatRoomData( @@ -37,7 +39,6 @@ public class ChatRoomData // @RequiredArgsConstructor unfortunately not possible, because // the `historyLimit` is not set, if `createSink()` is called // from the variable declaration! - this.sink = createSink(); } @@ -64,8 +65,8 @@ public class ChatRoomData sink.error(new MessageMutationException(existing, text)); } }) - .switchIfEmpty( - Mono + .switchIfEmpty(active + ? Mono .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text)) .doOnNext(m -> { @@ -74,7 +75,8 @@ public class ChatRoomData { log.warn("Emitting of message failed with {} for {}", result.name(), m); } - })); + }) + : Mono.error(new ChatRoomInactiveException(service.getChatRoomId()))); } @@ -91,9 +93,13 @@ public class ChatRoomData synchronized public Flux listen() { - return sink - .asFlux() - .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel! + return active + ? sink + .asFlux() + .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel! + : Flux + .error(new ChatRoomInactiveException(service.getChatRoomId())); + } public Flux getMessages() @@ -106,9 +112,17 @@ public class ChatRoomData return service.getMessages(first, last); } - public void close() + public void activate() + { + log.info("{} is being activated", service.getChatRoomId()); + this.sink = createSink(); + active = true; + } + + public void deactivate() { - log.info("{} is being closed", service.getChatRoomId()); + log.info("{} is being deactivated", service.getChatRoomId()); + active = false; sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java new file mode 100644 index 00000000..9580c96e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chat.backend.domain.exceptions; + +import lombok.Getter; + +import java.util.UUID; + + +public class ChatRoomInactiveException extends IllegalStateException +{ + @Getter + private final UUID chatRoomId; + + + public ChatRoomInactiveException(UUID chatRoomId) + { + super("Chat-Room " + chatRoomId + " was closed."); + this.chatRoomId = chatRoomId; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 2aac0fac..8e3cc430 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -76,12 +76,13 @@ public class SimpleChatHomeService implements ChatHomeService new InMemoryChatMessageService(chatRoomId); chatRoomInfo.put(chatRoomId, info); - chatRoomData.put( - info.getId(), + ChatRoomData chatRoomData = new ChatRoomData( clock, chatMessageService, - historyLimit)); + historyLimit); + chatRoomData.activate(); + this.chatRoomData.put(info.getId(), chatRoomData); return chatMessageService.restore(storageStrategy); }) @@ -100,6 +101,7 @@ public class SimpleChatHomeService implements ChatHomeService ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); this.chatRoomInfo.put(id, chatRoomInfo); ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit); + chatRoomData.activate(); this.chatRoomData.put(id, chatRoomData); return Mono.just(chatRoomInfo); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 8dcc1bce..32a57206 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -144,6 +144,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener isShardOwned[partition] = true; this.currentOffset[partition] = currentOffset; + chatRoomData[partition] + .values() + .forEach(chatRoomData -> chatRoomData.activate()); + log.info( "Partition assigned: {} - loading messages: next={} -> current={}", partition, @@ -176,12 +180,15 @@ public class DataChannel implements Channel, ConsumerRebalanceListener partitions.forEach(topicPartition -> { int partition = topicPartition.partition(); - chatRoomData[partition] - .values() - .forEach(chatRoomData -> chatRoomData.close()); isShardOwned[partition] = false; nextOffset[partition] = consumer.position(topicPartition); + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + + chatRoomData[partition] + .values() + .forEach(chatRoomData -> chatRoomData.deactivate()); + channelMediator.shardRevoked(partition); }); } @@ -327,7 +334,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener void createChatRoomData(ChatRoomInfo chatRoomInfo) { - computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard()); + ChatRoomData chatRoomData = computeChatRoomData( + chatRoomInfo.getId(), + chatRoomInfo.getShard()); + chatRoomData.activate(); } Mono getChatRoomData(int shard, UUID id) -- 2.20.1