@Slf4j
public class MongoDbStorageStrategy implements StorageStrategy
{
- private final ChatRoomRepository repository;
+ private final ChatRoomRepository chatRoomRepository;
+ private final MessageRepository messageRepository;
private final Clock clock;
private final int bufferSize;
+ private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
{
chatRoomInfoFlux
.map(ChatRoomTo::from)
- .subscribe(chatroomTo -> repository.save(chatroomTo));
+ .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
}
@Override
public Flux<ChatRoomInfo> readChatRoomInfo()
{
return Flux
- .fromIterable(repository.findAll())
+ .fromIterable(chatRoomRepository.findAll())
.map(chatRoomTo ->
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
- return new ChatRoomData(
- clock,
- factory.create(
- Flux
- .fromIterable(chatRoomTo.getMessages())
- .map(messageTo -> messageTo.toMessage())),
- bufferSize);
+ int shard = shardingStrategy.selectShard(chatRoomId);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomTo.getName(),
+ shard);
});
}
public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
{
chatRoomDataFlux
- .map(ChatRoomTo::from)
- .subscribe(chatroomTo -> repository.save(chatroomTo));
+ .flatMap(ChatRoomTo::from)
+ .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
}
@Override
public Flux<ChatRoomData> readChatRoomData()
{
return Flux
- .fromIterable(repository.findAll())
+ .fromIterable(chatRoomRepository.findAll())
.map(chatRoomTo ->
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());