X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Fmongodb%2FMongoDbStorageStrategy.java;h=13f3c0d6639f2e94fca4fef1b0079ef43c2dc700;hb=13f86063f851fc2c4ad6de56c8edb78bff9d0592;hp=644ab8870947172edcf19868164cfe8063c5f176;hpb=13b6b7ddf8f0c14626843fe58af7aaf66bd4b900;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 644ab887..13f3c0d6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -1,14 +1,14 @@ -package de.juplo.kafka.chat.backend.persistence.storage.mongodb; +package de.juplo.kafka.chat.backend.storage.mongodb; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.util.UUID; +import java.util.logging.Level; @RequiredArgsConstructor @@ -17,53 +17,58 @@ public class MongoDbStorageStrategy implements StorageStrategy { private final ChatRoomRepository chatRoomRepository; private final MessageRepository messageRepository; - private final ShardingStrategy shardingStrategy; + private final String loggingCategory = MongoDbStorageStrategy.class.getSimpleName(); + private final Level loggingLevel; + private final boolean showOperatorLine; @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { - chatRoomInfoFlux + return chatRoomInfoFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(ChatRoomTo::from) - .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); + .flatMap(chatRoomRepository::save) + .map(ChatRoomTo::toChatRoomInfo); } @Override public Flux readChatRoomInfo() { - return Flux - .fromIterable(chatRoomRepository.findAll()) - .map(chatRoomTo -> - { - UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomTo.getName(), - shard); - }); + return chatRoomRepository + .findAll() + .log( + loggingCategory, + loggingLevel, + showOperatorLine) + .map(ChatRoomTo::toChatRoomInfo); } @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) + public Flux writeChatRoomData(UUID chatRoomId, Flux messageFlux) { - messageFlux + return messageFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(message -> MessageTo.from(chatRoomId, message)) - .subscribe(messageTo -> messageRepository.save(messageTo)); + .flatMap(messageRepository::save) + .map(MessageTo::toMessage); } @Override public Flux readChatRoomData(UUID chatRoomId) { - return Flux - .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) - .map(messageTo -> messageTo.toMessage()); + return messageRepository + .findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()) + .log( + loggingCategory, + loggingLevel, + showOperatorLine) + .map(MessageTo::toMessage); } }