feat: Introduced config-parameters for the `io.projectreactor`-logging
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / FilesStorageStrategy.java
index cb7dd31..aaa6159 100644 (file)
@@ -17,6 +17,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.UUID;
+import java.util.logging.Level;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@@ -32,10 +33,12 @@ public class FilesStorageStrategy implements StorageStrategy
   private final Path storagePath;
   private final ShardingStrategy shardingStrategy;
   private final ObjectMapper mapper;
+  private final Level loggingLevel;
+  private final boolean showOperatorLine;
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -48,8 +51,11 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatRoomInfoFlux
-          .log()
+      return chatRoomInfoFlux
+          .log(
+              FilesStorageStrategy.class.getSimpleName(),
+              loggingLevel,
+              showOperatorLine)
           .doFirst(() ->
           {
             try
@@ -74,12 +80,13 @@ public class FilesStorageStrategy implements StorageStrategy
               throw new RuntimeException(e);
             }
           })
-          .subscribe(chatRoomInfo ->
+          .map(chatRoomInfo ->
           {
             try
             {
               ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
               generator.writeObject(chatRoomInfoTo);
+              return chatRoomInfo;
             }
             catch (IOException e)
             {
@@ -99,7 +106,10 @@ public class FilesStorageStrategy implements StorageStrategy
     JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
     return Flux
         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
-        .log()
+        .log(
+            FilesStorageStrategy.class.getSimpleName(),
+            loggingLevel,
+            showOperatorLine)
         .map(chatRoomInfoTo ->
         {
           UUID chatRoomId = chatRoomInfoTo.getId();
@@ -119,7 +129,7 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(
+  public Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux)
   {
@@ -134,8 +144,11 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      messageFlux
-          .log()
+      return messageFlux
+          .log(
+              FilesStorageStrategy.class.getSimpleName(),
+              loggingLevel,
+              showOperatorLine)
           .doFirst(() ->
           {
             try
@@ -160,12 +173,13 @@ public class FilesStorageStrategy implements StorageStrategy
               throw new RuntimeException(e);
             }
           })
-          .subscribe(message ->
+          .map(message ->
           {
             try
             {
               MessageTo messageTo = MessageTo.from(message);
               generator.writeObject(messageTo);
+              return message;
             }
             catch (IOException e)
             {
@@ -185,7 +199,10 @@ public class FilesStorageStrategy implements StorageStrategy
     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
     return Flux
         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
-        .log()
+        .log(
+            FilesStorageStrategy.class.getSimpleName(),
+            loggingLevel,
+            showOperatorLine)
         .map(MessageTo::toMessage);
   }