]> juplo.de Git - demos/kafka/chat/commitdiff
WAS:TMP:IS?FIX:WIP:test: `*ConfigurationIT` asserts, if restored messages can be...
authorKai Moritz <kai@juplo.de>
Mon, 11 Mar 2024 12:08:21 +0000 (13:08 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 16 Mar 2024 09:45:41 +0000 (10:45 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java

index 640dc9e98fb3c8c7f86e9c99be5604f3485b9102..293a2409c2d8f6c1b10c37a260533b029a01f7f7 100644 (file)
@@ -4,10 +4,13 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
+import java.util.UUID;
 
 
 public interface ChatMessageService
 {
+  UUID getChatRoomId();
+
   Mono<Message> persistMessage(
       Message.MessageKey key,
       LocalDateTime timestamp,
index 9dbeda9ed47d690dfb52561f7584647b6e7bf41a..bff56c1e763d2a7f9335b1e1acf52d4e2b345044 100644 (file)
@@ -106,6 +106,12 @@ public class ChatRoomData
     return service.getMessages(first, last);
   }
 
+  public void close()
+  {
+    log.info("{} is being closed", service.getChatRoomId());
+    sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+  }
+
   private Sinks.Many<Message> createSink()
   {
     return Sinks
index 5d5feb87845cfbd536ff6f8364b87583b01d63cc..a9a76a590543b7188999c35835ce0bfe1a620027 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.inmemory;
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -15,6 +16,7 @@ import java.util.UUID;
 @Slf4j
 public class InMemoryChatMessageService implements ChatMessageService
 {
+  @Getter
   private final UUID chatRoomId;
   private final LinkedHashMap<Message.MessageKey, Message> messages;
 
index 63f36f53d504ec2bb30219e253069f270d13dbb4..8dcc1bce8315323dc6e2e5f547087809351b9835 100644 (file)
@@ -176,6 +176,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     partitions.forEach(topicPartition ->
     {
       int partition = topicPartition.partition();
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.close());
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
index 8ab50f1f966eecd453e356f641dfd8d2f3337c03..f93a534ab8305b14fce50930cb379541c942c911 100644 (file)
@@ -1,7 +1,9 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
-import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -16,6 +18,7 @@ import java.util.UUID;
 public class KafkaChatMessageService implements ChatMessageService
 {
   private final DataChannel dataChannel;
+  @Getter
   private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();