refactor: Moved extracted the `subscribe()`-call from `StorageStrategy`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / FilesStorageStrategy.java
index 7e04a96..cdb4f0d 100644 (file)
@@ -35,7 +35,7 @@ public class FilesStorageStrategy implements StorageStrategy
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -48,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatRoomInfoFlux
+      return chatRoomInfoFlux
           .log()
           .doFirst(() ->
           {
@@ -86,8 +86,7 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               throw new RuntimeException(e);
             }
-          })
-          .subscribe();
+          });
     }
     catch (IOException e)
     {
@@ -121,7 +120,7 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(
+  public Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux)
   {
@@ -136,7 +135,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      messageFlux
+      return messageFlux
           .log()
           .doFirst(() ->
           {
@@ -174,8 +173,7 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               throw new RuntimeException(e);
             }
-          })
-          .subscribe();
+          });
     }
     catch (IOException e)
     {