From 075fbbf0c290748fa8b6d684c145a2f1ab8cbe0d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Mar 2024 13:08:21 +0100 Subject: [PATCH] WAS:TMP:IS?FIX:WIP:test: `*ConfigurationIT` asserts, if restored messages can be seen --- .../juplo/kafka/chat/backend/domain/ChatMessageService.java | 3 +++ .../de/juplo/kafka/chat/backend/domain/ChatRoomData.java | 6 ++++++ .../implementation/inmemory/InMemoryChatMessageService.java | 2 ++ .../chat/backend/implementation/kafka/DataChannel.java | 3 +++ .../implementation/kafka/KafkaChatMessageService.java | 5 ++++- 5 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java index 640dc9e9..293a2409 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java @@ -4,10 +4,13 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; +import java.util.UUID; public interface ChatMessageService { + UUID getChatRoomId(); + Mono persistMessage( Message.MessageKey key, LocalDateTime timestamp, diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index 9dbeda9e..bff56c1e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -106,6 +106,12 @@ public class ChatRoomData return service.getMessages(first, last); } + public void close() + { + log.info("{} is being closed", service.getChatRoomId()); + sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); + } + private Sinks.Many createSink() { return Sinks diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java index 5d5feb87..a9a76a59 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.inmemory; import de.juplo.kafka.chat.backend.domain.ChatMessageService; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -15,6 +16,7 @@ import java.util.UUID; @Slf4j public class InMemoryChatMessageService implements ChatMessageService { + @Getter private final UUID chatRoomId; private final LinkedHashMap messages; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 63f36f53..8dcc1bce 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -176,6 +176,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener partitions.forEach(topicPartition -> { int partition = topicPartition.partition(); + chatRoomData[partition] + .values() + .forEach(chatRoomData -> chatRoomData.close()); isShardOwned[partition] = false; nextOffset[partition] = consumer.position(topicPartition); log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java index 8ab50f1f..f93a534a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java @@ -1,7 +1,9 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatMessageService; -import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -16,6 +18,7 @@ import java.util.UUID; public class KafkaChatMessageService implements ChatMessageService { private final DataChannel dataChannel; + @Getter private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); -- 2.20.1