package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
-import java.time.Clock;
import java.util.UUID;
@Slf4j
public class MongoDbStorageStrategy implements StorageStrategy
{
- private final ChatRoomRepository repository;
- private final Clock clock;
- private final int bufferSize;
+ private final ChatRoomRepository chatRoomRepository;
+ private final MessageRepository messageRepository;
private final ShardingStrategy shardingStrategy;
- private final ChatRoomServiceFactory factory;
@Override
- public void write(Flux<ChatRoom> chatroomFlux)
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
- chatroomFlux
+ chatRoomInfoFlux
.map(ChatRoomTo::from)
- .subscribe(chatroomTo -> repository.save(chatroomTo));
+ .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
}
@Override
- public Flux<ChatRoom> read()
+ public Flux<ChatRoomInfo> readChatRoomInfo()
{
return Flux
- .fromIterable(repository.findAll())
+ .fromIterable(chatRoomRepository.findAll())
.map(chatRoomTo ->
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoom(
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
chatRoomId,
chatRoomTo.getName(),
- shard,
- clock,
- factory.create(
- Flux
- .fromIterable(chatRoomTo.getMessages())
- .map(messageTo -> messageTo.toMessage())),
- bufferSize);
+ shard);
});
}
+
+ @Override
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ messageFlux
+ .map(message -> MessageTo.from(chatRoomId, message))
+ .subscribe(messageTo -> messageRepository.save(messageTo));
+ }
+
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
+ {
+ return Flux
+ .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+ .map(messageTo -> messageTo.toMessage());
+ }
}