import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
+import java.util.UUID;
 
 
 public interface ChatMessageService
 {
+  UUID getChatRoomId();
+
   Mono<Message> persistMessage(
       Message.MessageKey key,
       LocalDateTime timestamp,
 
     return service.getMessages(first, last);
   }
 
+  public void close()
+  {
+    log.info("{} is being closed", service.getChatRoomId());
+    sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+  }
+
   private Sinks.Many<Message> createSink()
   {
     return Sinks
 
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 @Slf4j
 public class InMemoryChatMessageService implements ChatMessageService
 {
+  @Getter
   private final UUID chatRoomId;
   private final LinkedHashMap<Message.MessageKey, Message> messages;
 
 
     partitions.forEach(topicPartition ->
     {
       int partition = topicPartition.partition();
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.close());
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
 
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
-import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 public class KafkaChatMessageService implements ChatMessageService
 {
   private final DataChannel dataChannel;
+  @Getter
   private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();