TMP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / mongodb / MongoDbStorageStrategy.java
index fb04039..13f3c0d 100644 (file)
@@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 
 import java.util.UUID;
+import java.util.logging.Level;
 
 
 @RequiredArgsConstructor
@@ -16,22 +17,33 @@ public class MongoDbStorageStrategy implements StorageStrategy
 {
   private final ChatRoomRepository chatRoomRepository;
   private final MessageRepository messageRepository;
+  private final String loggingCategory = MongoDbStorageStrategy.class.getSimpleName();
+  private final Level loggingLevel;
+  private final boolean showOperatorLine;
 
 
   @Override
   public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     return chatRoomInfoFlux
+        .log(
+            loggingCategory,
+            loggingLevel,
+            showOperatorLine)
         .map(ChatRoomTo::from)
-        .map(chatRoomRepository::save)
+        .flatMap(chatRoomRepository::save)
         .map(ChatRoomTo::toChatRoomInfo);
   }
 
   @Override
   public Flux<ChatRoomInfo> readChatRoomInfo()
   {
-    return Flux
-        .fromIterable(chatRoomRepository.findAll())
+    return chatRoomRepository
+        .findAll()
+        .log(
+            loggingCategory,
+            loggingLevel,
+            showOperatorLine)
         .map(ChatRoomTo::toChatRoomInfo);
   }
 
@@ -39,16 +51,24 @@ public class MongoDbStorageStrategy implements StorageStrategy
   public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
   {
     return messageFlux
+        .log(
+            loggingCategory,
+            loggingLevel,
+            showOperatorLine)
         .map(message -> MessageTo.from(chatRoomId, message))
-        .map(messageRepository::save)
+        .flatMap(messageRepository::save)
         .map(MessageTo::toMessage);
   }
 
   @Override
   public Flux<Message> readChatRoomData(UUID chatRoomId)
   {
-    return Flux
-        .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+    return messageRepository
+        .findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())
+        .log(
+            loggingCategory,
+            loggingLevel,
+            showOperatorLine)
         .map(MessageTo::toMessage);
   }
 }