package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
-import java.time.Clock;
import java.util.UUID;
@Slf4j
public class MongoDbStorageStrategy implements StorageStrategy
{
- private final ChatRoomRepository repository;
- private final Clock clock;
- private final int bufferSize;
- private final ChatRoomServiceFactory factory;
+ private final ChatRoomRepository chatRoomRepository;
+ private final MessageRepository messageRepository;
+ private final ShardingStrategy shardingStrategy;
@Override
- public void write(Flux<ChatRoom> chatroomFlux)
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
- chatroomFlux
+ chatRoomInfoFlux
.map(ChatRoomTo::from)
- .subscribe(chatroomTo -> repository.save(chatroomTo));
+ .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
}
@Override
- public Flux<ChatRoom> read()
+ public Flux<ChatRoomInfo> readChatRoomInfo()
{
return Flux
- .fromIterable(repository.findAll())
- .map(chatRoomTo -> new ChatRoom(
- UUID.fromString(chatRoomTo.getId()),
- chatRoomTo.getName(),
- clock,
- factory.create(
- Flux
- .fromIterable(chatRoomTo.getMessages())
- .map(messageTo -> messageTo.toMessage())),
- bufferSize));
+ .fromIterable(chatRoomRepository.findAll())
+ .map(chatRoomTo ->
+ {
+ UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+ int shard = shardingStrategy.selectShard(chatRoomId);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomTo.getName(),
+ shard);
+ });
+ }
+
+ @Override
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ messageFlux
+ .map(message -> MessageTo.from(chatRoomId, message))
+ .subscribe(messageTo -> messageRepository.save(messageTo));
+ }
+
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
+ {
+ return Flux
+ .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+ .map(messageTo -> messageTo.toMessage());
}
}