import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
+ private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
return Flux
.from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
.log()
- .map(infoTo -> new ChatRoomInfo(
- infoTo.getId(),
- infoTo.getName(),
- infoTo.getShard()));
+ .map(chatRoomInfoTo ->
+ {
+ UUID chatRoomId = chatRoomInfoTo.getId();
+ int shard = shardingStrategy.selectShard(chatRoomId);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomInfoTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomInfoTo.getName(),
+ shard);
+ });
}
@Override