X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Fmongodb%2FMongoDbStorageStrategy.java;h=13f3c0d6639f2e94fca4fef1b0079ef43c2dc700;hb=8300dcd98f681893a077051560151a8f1b94e38d;hp=780d64be3e19a2bf11ec000c53db22e5658b1bb9;hpb=c786c4a079da27f54e75d22e2d2c2a1693aba078;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 780d64be..13f3c0d6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -2,13 +2,13 @@ package de.juplo.kafka.chat.backend.storage.mongodb; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.implementation.ShardingStrategy; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.util.UUID; +import java.util.logging.Level; @RequiredArgsConstructor @@ -17,44 +17,58 @@ public class MongoDbStorageStrategy implements StorageStrategy { private final ChatRoomRepository chatRoomRepository; private final MessageRepository messageRepository; - private final ShardingStrategy shardingStrategy; + private final String loggingCategory = MongoDbStorageStrategy.class.getSimpleName(); + private final Level loggingLevel; + private final boolean showOperatorLine; @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { - chatRoomInfoFlux + return chatRoomInfoFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(ChatRoomTo::from) - .map(chatroomTo -> chatRoomRepository.save(chatroomTo)) - .subscribe(); + .flatMap(chatRoomRepository::save) + .map(ChatRoomTo::toChatRoomInfo); } @Override public Flux readChatRoomInfo() { - return Flux - .fromIterable(chatRoomRepository.findAll()) - .map(chatRoomTo -> - { - UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - return new ChatRoomInfo(chatRoomId, chatRoomTo.getName(), null); - }); + return chatRoomRepository + .findAll() + .log( + loggingCategory, + loggingLevel, + showOperatorLine) + .map(ChatRoomTo::toChatRoomInfo); } @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) + public Flux writeChatRoomData(UUID chatRoomId, Flux messageFlux) { - messageFlux + return messageFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(message -> MessageTo.from(chatRoomId, message)) - .map(messageTo -> messageRepository.save(messageTo)) - .subscribe(); + .flatMap(messageRepository::save) + .map(MessageTo::toMessage); } @Override public Flux readChatRoomData(UUID chatRoomId) { - return Flux - .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) - .map(messageTo -> messageTo.toMessage()); + return messageRepository + .findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()) + .log( + loggingCategory, + loggingLevel, + showOperatorLine) + .map(MessageTo::toMessage); } }