import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Paths.get(properties.getStorageDirectory()),
clock,
properties.getChatroomBufferSize(),
+ messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.filestorage;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Flux;
+
+
+public interface ChatRoomServiceFactory
+{
+ ChatRoomService create(Flux<Message> messageFlux);
+}
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
+ private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
return Flux
.from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo ->
- {
- InMemoryChatRoomService chatroomService =
- new InMemoryChatRoomService(readMessages(chatRoomTo));
- return new ChatRoom(
- chatRoomTo.getId(),
- chatRoomTo.getName(),
- clock,
- chatroomService,
- bufferSize);
- });
+ .map(chatRoomTo -> new ChatRoom(
+ chatRoomTo.getId(),
+ chatRoomTo.getName(),
+ clock,
+ factory.create(readMessages(chatRoomTo)),
+ bufferSize));
}
@Override
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- storageStrategy =new FileStorageStrategy(path, clock, 8, mapper);
+ storageStrategy =new FileStorageStrategy(
+ path,
+ clock,
+ 8,
+ messageFlux -> new InMemoryChatRoomService(messageFlux),
+ mapper);
}