refactor: Moved extracted the `subscribe()`-call from `StorageStrategy`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / FilesStorageStrategy.java
index cb7dd31..cdb4f0d 100644 (file)
@@ -35,7 +35,7 @@ public class FilesStorageStrategy implements StorageStrategy
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -48,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatRoomInfoFlux
+      return chatRoomInfoFlux
           .log()
           .doFirst(() ->
           {
@@ -74,12 +74,13 @@ public class FilesStorageStrategy implements StorageStrategy
               throw new RuntimeException(e);
             }
           })
-          .subscribe(chatRoomInfo ->
+          .map(chatRoomInfo ->
           {
             try
             {
               ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
               generator.writeObject(chatRoomInfoTo);
+              return chatRoomInfo;
             }
             catch (IOException e)
             {
@@ -119,7 +120,7 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(
+  public Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux)
   {
@@ -134,7 +135,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      messageFlux
+      return messageFlux
           .log()
           .doFirst(() ->
           {
@@ -160,12 +161,13 @@ public class FilesStorageStrategy implements StorageStrategy
               throw new RuntimeException(e);
             }
           })
-          .subscribe(message ->
+          .map(message ->
           {
             try
             {
               MessageTo messageTo = MessageTo.from(message);
               generator.writeObject(messageTo);
+              return message;
             }
             catch (IOException e)
             {