X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2FLocalJsonFilesStorageStrategy.java;h=7b490bf6c562ff8d242e84489d94221daada94d5;hb=af3553a0f5093819fc2af088b974375c7102ed9d;hp=6c6190876b85405efed0db054764dff18dacd701;hpb=c04279dc82b8e662f7a8408ff74f7acd9951cf72;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java index 6c619087..7b490bf6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java @@ -3,9 +3,9 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.api.ChatroomTo; +import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.api.MessageTo; -import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -14,6 +14,7 @@ import reactor.core.publisher.Flux; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Clock; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -27,12 +28,13 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy private final Path storagePath; + private final Clock clock; + private final int bufferSize; private final ObjectMapper mapper; - private final InMemoryChatroomFactory chatroomFactory; @Override - public void writeChatrooms(Flux chatroomFlux) + public void writeChatrooms(Flux chatroomFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -75,7 +77,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy { try { - ChatroomTo chatroomTo = ChatroomTo.from(chatroom); + ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); generator.writeObject(chatroomTo); writeMessages(chatroomTo, chatroom.getMessages()); } @@ -92,25 +94,27 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy } @Override - public Flux readChatrooms() + public Flux readChatrooms() { - JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class); + JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); return Flux - .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatroomTo -> + .map(chatRoomTo -> { - InMemoryChatroomService chatroomService = - new InMemoryChatroomService(readMessages(chatroomTo)); - return chatroomFactory.restoreChatroom( - chatroomTo.getId(), - chatroomTo.getName(), - chatroomService); + InMemoryChatRoomService chatroomService = + new InMemoryChatRoomService(readMessages(chatRoomTo)); + return new ChatRoom( + chatRoomTo.getId(), + chatRoomTo.getName(), + clock, + chatroomService, + bufferSize); }); } @Override - public void writeMessages(ChatroomTo chatroomTo, Flux messageFlux) + public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) { Path path = chatroomPath(chatroomTo); log.info("Writing messages for {} to {}", chatroomTo, path); @@ -169,7 +173,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy } @Override - public Flux readMessages(ChatroomTo chatroomTo) + public Flux readMessages(ChatRoomTo chatroomTo) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux @@ -183,7 +187,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); } - Path chatroomPath(ChatroomTo chatroomTo) + Path chatroomPath(ChatRoomTo chatroomTo) { return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json")); }