package de.juplo.kafka.chat.backend.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
   @Bean
   public StorageStrategy storageStrategy(
       ChatRoomRepository chatRoomRepository,
-      MessageRepository messageRepository)
+      MessageRepository messageRepository,
+      ChatBackendProperties properties)
   {
-    return new MongoDbStorageStrategy(chatRoomRepository, messageRepository);
+    return new MongoDbStorageStrategy(
+        chatRoomRepository,
+        messageRepository,
+        properties.getProjectreactor().getLoggingLevel(),
+        properties.getProjectreactor().isShowOperatorLine());
   }
 }
 
 import reactor.core.publisher.Flux;
 
 import java.util.UUID;
+import java.util.logging.Level;
 
 
 @RequiredArgsConstructor
 {
   private final ChatRoomRepository chatRoomRepository;
   private final MessageRepository messageRepository;
+  private final String loggingCategory = MongoDbStorageStrategy.class.getSimpleName();
+  private final Level loggingLevel;
+  private final boolean showOperatorLine;
 
 
   @Override
   public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     return chatRoomInfoFlux
+        .log(
+            loggingCategory,
+            loggingLevel,
+            showOperatorLine)
         .map(ChatRoomTo::from)
         .map(chatRoomRepository::save)
         .map(ChatRoomTo::toChatRoomInfo);
   {
     return Flux
         .fromIterable(chatRoomRepository.findAll())
+        .log(
+            loggingCategory,
+            loggingLevel,
+            showOperatorLine)
         .map(ChatRoomTo::toChatRoomInfo);
   }
 
   public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
   {
     return messageFlux
+        .log(
+            loggingCategory,
+            loggingLevel,
+            showOperatorLine)
         .map(message -> MessageTo.from(chatRoomId, message))
         .map(messageRepository::save)
         .map(MessageTo::toMessage);
   {
     return Flux
         .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+        .log(
+            loggingCategory,
+            loggingLevel,
+            showOperatorLine)
         .map(MessageTo::toMessage);
   }
 }
 
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.time.Clock;
+import java.util.logging.Level;
 
 
 @Testcontainers
         ChatRoomRepository chatRoomRepository,
         MessageRepository messageRepository)
     {
-      return new MongoDbStorageStrategy(chatRoomRepository, messageRepository);
+      return new MongoDbStorageStrategy(
+          chatRoomRepository,
+          messageRepository,
+          Level.FINE,
+          true);
     }
 
     @Bean