import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
 import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
         Paths.get(properties.getStorageDirectory()),
         clock,
         properties.getChatroomBufferSize(),
+        messageFlux -> new InMemoryChatRoomService(messageFlux),
         mapper);
   }
 
 
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.filestorage;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Flux;
+
+
+public interface ChatRoomServiceFactory
+{
+  ChatRoomService create(Flux<Message> messageFlux);
+}
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
+  private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
 
     return Flux
         .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(chatRoomTo ->
-        {
-          InMemoryChatRoomService chatroomService =
-              new InMemoryChatRoomService(readMessages(chatRoomTo));
-          return new ChatRoom(
-              chatRoomTo.getId(),
-              chatRoomTo.getName(),
-              clock,
-              chatroomService,
-              bufferSize);
-        });
+        .map(chatRoomTo -> new ChatRoom(
+            chatRoomTo.getId(),
+            chatRoomTo.getName(),
+            clock,
+            factory.create(readMessages(chatRoomTo)),
+            bufferSize));
   }
 
   @Override
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeEach;
 
     mapper = new ObjectMapper();
     mapper.registerModule(new JavaTimeModule());
     mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-    storageStrategy = new FileStorageStrategy(path, clock, 8, mapper);
-
+    storageStrategy = new FileStorageStrategy(
+        path,
+        clock,
+        8,
+        messageFlux -> new InMemoryChatRoomService(messageFlux),
+        mapper);
   }