X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fstorage%2Ffiles%2FFilesStorageStrategy.java;h=24c6a01edd1356f32d3caffd5870e4bbb8c55aca;hb=ff98b068a91fc9e60e51bd4a95065633bb8ed2db;hp=9952117b3fc2adc78cadf6052cf51031a4e928d8;hpb=bc6e23259c4a822a961b00f221baf5f6018d73a6;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 9952117b..24c6a01e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -1,10 +1,11 @@ -package de.juplo.kafka.chat.backend.persistence.filestorage; +package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.api.MessageTo; +import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; @@ -16,6 +17,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.util.UUID; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -23,7 +25,7 @@ import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @RequiredArgsConstructor @Slf4j -public class FileStorageStrategy implements StorageStrategy +public class FilesStorageStrategy implements StorageStrategy { public static final String CHATROOMS_FILENAME = "chatrooms.json"; @@ -31,12 +33,13 @@ public class FileStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @Override - public void writeChatrooms(Flux chatroomFlux) + public void write(Flux chatroomFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -96,21 +99,26 @@ public class FileStorageStrategy implements StorageStrategy } @Override - public Flux readChatrooms() + public Flux read() { JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> new ChatRoom( - chatRoomTo.getId(), - chatRoomTo.getName(), - clock, - factory.create(readMessages(chatRoomTo)), - bufferSize)); + .map(chatRoomTo -> + { + UUID chatRoomId = chatRoomTo.getId(); + int shard = shardingStrategy.selectShard(chatRoomId); + return new ChatRoom( + chatRoomTo.getId(), + chatRoomTo.getName(), + shard, + clock, + factory.create(readMessages(chatRoomTo)), + bufferSize); + }); } - @Override public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) { Path path = chatroomPath(chatroomTo); @@ -169,7 +177,6 @@ public class FileStorageStrategy implements StorageStrategy } } - @Override public Flux readMessages(ChatRoomTo chatroomTo) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);