From: Kai Moritz Date: Mon, 9 Jan 2023 21:35:40 +0000 (+0100) Subject: refactor: `FileStorageStrategy` is not bound to `InMemoryChatRoomService` X-Git-Tag: wip-sharding~47 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=eda6ad277fc798d5cf678efc7cf289cf768b2c0c;p=demos%2Fkafka%2Fchat refactor: `FileStorageStrategy` is not bound to `InMemoryChatRoomService` --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index c4a36e32..33734a73 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -6,6 +6,7 @@ import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -46,6 +47,7 @@ public class ChatBackendConfiguration Paths.get(properties.getStorageDirectory()), clock, properties.getChatroomBufferSize(), + messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/ChatRoomServiceFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/ChatRoomServiceFactory.java new file mode 100644 index 00000000..42c000bb --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/ChatRoomServiceFactory.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.chat.backend.persistence.filestorage; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message; +import reactor.core.publisher.Flux; + + +public interface ChatRoomServiceFactory +{ + ChatRoomService create(Flux messageFlux); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java index 433e5f1f..9952117b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java @@ -8,7 +8,6 @@ import de.juplo.kafka.chat.backend.api.MessageTo; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -32,6 +31,7 @@ public class FileStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; + private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @@ -102,17 +102,12 @@ public class FileStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> - { - InMemoryChatRoomService chatroomService = - new InMemoryChatRoomService(readMessages(chatRoomTo)); - return new ChatRoom( - chatRoomTo.getId(), - chatRoomTo.getName(), - clock, - chatroomService, - bufferSize); - }); + .map(chatRoomTo -> new ChatRoom( + chatRoomTo.getId(), + chatRoomTo.getName(), + clock, + factory.create(readMessages(chatRoomTo)), + bufferSize)); } @Override diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java index 7efa081a..e10aae03 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; @@ -33,8 +34,12 @@ public class InMemoryWithFileStorageStrategyIT extends AbstractStorageStrategyIT mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - storageStrategy = new FileStorageStrategy(path, clock, 8, mapper); - + storageStrategy = new FileStorageStrategy( + path, + clock, + 8, + messageFlux -> new InMemoryChatRoomService(messageFlux), + mapper); }