import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
   private final Path storagePath;
   private final Clock clock;
   private final int bufferSize;
+  private final ShardingStrategy shardingStrategy;
   private final ChatRoomServiceFactory factory;
   private final ObjectMapper mapper;
 
     return Flux
         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
         .log()
-        .map(infoTo -> new ChatRoomInfo(
-            infoTo.getId(),
-            infoTo.getName(),
-            infoTo.getShard()));
+        .map(chatRoomInfoTo ->
+        {
+          UUID chatRoomId = chatRoomInfoTo.getId();
+          int shard = shardingStrategy.selectShard(chatRoomId);
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomInfoTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
+              chatRoomId,
+              chatRoomInfoTo.getName(),
+              shard);
+        });
   }
 
   @Override