- .fromIterable(repository.findAll())
- .map(chatRoomTo -> new ChatRoom(
- UUID.fromString(chatRoomTo.getId()),
- chatRoomTo.getName(),
- clock,
- factory.create(
- Flux
- .fromIterable(chatRoomTo.getMessages())
- .map(messageTo -> messageTo.toMessage())),
- bufferSize));
+ .fromIterable(chatRoomRepository.findAll())
+ .map(chatRoomTo ->
+ {
+ UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+ int shard = shardingStrategy.selectShard(chatRoomId);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomTo.getName(),
+ shard);
+ });
+ }
+
+ @Override
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ messageFlux
+ .map(message -> MessageTo.from(chatRoomId, message))
+ .subscribe(messageTo -> messageRepository.save(messageTo));
+ }
+
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
+ {
+ return Flux
+ .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+ .map(messageTo -> messageTo.toMessage());