WAS:TMP:IS?FIX:WIP:test: `*ConfigurationIT` asserts, if restored messages can be...
authorKai Moritz <kai@juplo.de>
Mon, 11 Mar 2024 12:08:21 +0000 (13:08 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 16 Mar 2024 09:45:41 +0000 (10:45 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java

index 640dc9e..293a240 100644 (file)
@@ -4,10 +4,13 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
+import java.util.UUID;
 
 
 public interface ChatMessageService
 {
+  UUID getChatRoomId();
+
   Mono<Message> persistMessage(
       Message.MessageKey key,
       LocalDateTime timestamp,
index 9dbeda9..bff56c1 100644 (file)
@@ -106,6 +106,12 @@ public class ChatRoomData
     return service.getMessages(first, last);
   }
 
+  public void close()
+  {
+    log.info("{} is being closed", service.getChatRoomId());
+    sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+  }
+
   private Sinks.Many<Message> createSink()
   {
     return Sinks
index 5d5feb8..a9a76a5 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.inmemory;
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -15,6 +16,7 @@ import java.util.UUID;
 @Slf4j
 public class InMemoryChatMessageService implements ChatMessageService
 {
+  @Getter
   private final UUID chatRoomId;
   private final LinkedHashMap<Message.MessageKey, Message> messages;
 
index 63f36f5..8dcc1bc 100644 (file)
@@ -176,6 +176,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     partitions.forEach(topicPartition ->
     {
       int partition = topicPartition.partition();
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.close());
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
index 8ab50f1..f93a534 100644 (file)
@@ -1,7 +1,9 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
-import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -16,6 +18,7 @@ import java.util.UUID;
 public class KafkaChatMessageService implements ChatMessageService
 {
   private final DataChannel dataChannel;
+  @Getter
   private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();