X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fstorage%2Ffiles%2FFilesStorageStrategy.java;h=9c791977135170d2024ebef87c026a62f33ff227;hb=e8d8cb2aba9988608ee98b0a7dfc1053b6429040;hp=9952117b3fc2adc78cadf6052cf51031a4e928d8;hpb=bc6e23259c4a822a961b00f221baf5f6018d73a6;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 9952117b..9c791977 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -1,13 +1,14 @@ -package de.juplo.kafka.chat.backend.persistence.filestorage; +package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.api.ChatRoomTo; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import de.juplo.kafka.chat.backend.api.MessageTo; -import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -15,7 +16,7 @@ import reactor.core.publisher.Flux; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Clock; +import java.util.UUID; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -23,20 +24,18 @@ import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @RequiredArgsConstructor @Slf4j -public class FileStorageStrategy implements StorageStrategy +public class FilesStorageStrategy implements StorageStrategy { public static final String CHATROOMS_FILENAME = "chatrooms.json"; private final Path storagePath; - private final Clock clock; - private final int bufferSize; - private final ChatRoomServiceFactory factory; + private final ShardingStrategy shardingStrategy; private final ObjectMapper mapper; @Override - public void writeChatrooms(Flux chatroomFlux) + public void writeChatRoomInfo(Flux chatRoomInfoFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -49,7 +48,7 @@ public class FileStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatroomFlux + chatRoomInfoFlux .log() .doFirst(() -> { @@ -75,13 +74,12 @@ public class FileStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }) - .subscribe(chatroom -> + .subscribe(chatRoomInfo -> { try { - ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); - generator.writeObject(chatroomTo); - writeMessages(chatroomTo, chatroom.getMessages()); + ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); + generator.writeObject(chatRoomInfoTo); } catch (IOException e) { @@ -96,25 +94,37 @@ public class FileStorageStrategy implements StorageStrategy } @Override - public Flux readChatrooms() + public Flux readChatRoomInfo() { - JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); + JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux - .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> new ChatRoom( - chatRoomTo.getId(), - chatRoomTo.getName(), - clock, - factory.create(readMessages(chatRoomTo)), - bufferSize)); + .map(chatRoomInfoTo -> + { + UUID chatRoomId = chatRoomInfoTo.getId(); + int shard = shardingStrategy.selectShard(chatRoomId); + + log.info( + "{} - old shard: {}, new shard: {}", + chatRoomId, + chatRoomInfoTo.getShard(), + shard); + + return new ChatRoomInfo( + chatRoomId, + chatRoomInfoTo.getName(), + shard); + }); } @Override - public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) + public void writeChatRoomData( + UUID chatRoomId, + Flux messageFlux) { - Path path = chatroomPath(chatroomTo); - log.info("Writing messages for {} to {}", chatroomTo, path); + Path path = chatroomPath(chatRoomId); + log.info("Writing messages for {} to {}", chatRoomId, path); try { Files.createDirectories(storagePath); @@ -170,11 +180,11 @@ public class FileStorageStrategy implements StorageStrategy } @Override - public Flux readMessages(ChatRoomTo chatroomTo) + public Flux readChatRoomData(UUID chatRoomId) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux - .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) + .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) .log() .map(MessageTo::toMessage); } @@ -184,8 +194,8 @@ public class FileStorageStrategy implements StorageStrategy return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); } - Path chatroomPath(ChatRoomTo chatroomTo) + Path chatroomPath(UUID id) { - return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json")); + return storagePath.resolve(Path.of(id.toString() + ".json")); } }