WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code
authorKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 21:02:33 +0000 (23:02 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 3 Sep 2023 21:02:33 +0000 (23:02 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java

index bcad3f3..ca0f851 100644 (file)
@@ -9,6 +9,7 @@ import de.juplo.kafka.chat.backend.domain.ChatRoomData;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
@@ -33,6 +34,7 @@ public class FilesStorageStrategy implements StorageStrategy
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
@@ -103,10 +105,22 @@ public class FilesStorageStrategy implements StorageStrategy
     return Flux
         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(infoTo -> new ChatRoomInfo(
-            infoTo.getId(),
-            infoTo.getName(),
-            infoTo.getShard()));
+        .map(chatRoomInfoTo ->
+        {
+          UUID chatRoomId = chatRoomInfoTo.getId();
+          int shard = shardingStrategy.selectShard(chatRoomId);
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomInfoTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
+              chatRoomId,
+              chatRoomInfoTo.getName(),
+              shard);
+        });
   }
 
   @Override