-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+package de.juplo.kafka.chat.backend.storage.mongodb;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.util.UUID;
+import java.util.logging.Level;
@RequiredArgsConstructor
{
private final ChatRoomRepository chatRoomRepository;
private final MessageRepository messageRepository;
- private final ShardingStrategy shardingStrategy;
+ private final String loggingCategory = MongoDbStorageStrategy.class.getSimpleName();
+ private final Level loggingLevel;
+ private final boolean showOperatorLine;
@Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
- chatRoomInfoFlux
+ return chatRoomInfoFlux
+ .log(
+ loggingCategory,
+ loggingLevel,
+ showOperatorLine)
.map(ChatRoomTo::from)
- .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
+ .flatMap(chatRoomRepository::save)
+ .map(ChatRoomTo::toChatRoomInfo);
}
@Override
public Flux<ChatRoomInfo> readChatRoomInfo()
{
- return Flux
- .fromIterable(chatRoomRepository.findAll())
- .map(chatRoomTo ->
- {
- UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
- int shard = shardingStrategy.selectShard(chatRoomId);
-
- log.info(
- "{} - old shard: {}, new shard: {}",
- chatRoomId,
- chatRoomTo.getShard(),
- shard);
-
- return new ChatRoomInfo(
- chatRoomId,
- chatRoomTo.getName(),
- shard);
- });
+ return chatRoomRepository
+ .findAll()
+ .log(
+ loggingCategory,
+ loggingLevel,
+ showOperatorLine)
+ .map(ChatRoomTo::toChatRoomInfo);
}
@Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
{
- messageFlux
+ return messageFlux
+ .log(
+ loggingCategory,
+ loggingLevel,
+ showOperatorLine)
.map(message -> MessageTo.from(chatRoomId, message))
- .subscribe(messageTo -> messageRepository.save(messageTo));
+ .flatMap(messageRepository::save)
+ .map(MessageTo::toMessage);
}
@Override
public Flux<Message> readChatRoomData(UUID chatRoomId)
{
- return Flux
- .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
- .map(messageTo -> messageTo.toMessage());
+ return messageRepository
+ .findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())
+ .log(
+ loggingCategory,
+ loggingLevel,
+ showOperatorLine)
+ .map(MessageTo::toMessage);
}
}