From: Kai Moritz Date: Sun, 3 Sep 2023 21:02:33 +0000 (+0200) Subject: WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code X-Git-Tag: rebase--2023-09-05--23-53~24 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a6a2e032e8e5f2e378335889a0a307d6ae6b29c4;p=demos%2Fkafka%2Fchat WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index bcad3f37..ca0f851b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -9,6 +9,7 @@ import de.juplo.kafka.chat.backend.domain.ChatRoomData; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -33,6 +34,7 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @@ -103,10 +105,22 @@ public class FilesStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(infoTo -> new ChatRoomInfo( - infoTo.getId(), - infoTo.getName(), - infoTo.getShard())); + .map(chatRoomInfoTo -> + { + UUID chatRoomId = chatRoomInfoTo.getId(); + int shard = shardingStrategy.selectShard(chatRoomId); + + log.info( + "{} - old shard: {}, new shard: {}", + chatRoomId, + chatRoomInfoTo.getShard(), + shard); + + return new ChatRoomInfo( + chatRoomId, + chatRoomInfoTo.getName(), + shard); + }); } @Override