WIP:mongodb map vs subscribe - subscribe rausgezogen
authorKai Moritz <kai@juplo.de>
Sun, 18 Feb 2024 18:44:05 +0000 (19:44 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 18 Feb 2024 18:44:05 +0000 (19:44 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java

index 9dd7625..a62f408 100644 (file)
@@ -31,21 +31,21 @@ public interface StorageStrategy
                     this::logFailure)));
   }
 
-  void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+  Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
   Flux<ChatRoomInfo> readChatRoomInfo();
-  default void writeChatRoomData(
+  default Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux,
       SuccessCallback successCallback,
       FailureCallback failureCallback)
   {
-    writeChatRoomData(
+    return writeChatRoomData(
         chatRoomId,
         messageFlux
             .doOnComplete(() -> successCallback.accept(chatRoomId))
             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
   }
-  void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+  Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
   Flux<Message> readChatRoomData(UUID chatRoomId);
 
   interface SuccessCallback extends Consumer<UUID> {}
index 7e04a96..cdb4f0d 100644 (file)
@@ -35,7 +35,7 @@ public class FilesStorageStrategy implements StorageStrategy
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -48,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatRoomInfoFlux
+      return chatRoomInfoFlux
           .log()
           .doFirst(() ->
           {
@@ -86,8 +86,7 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               throw new RuntimeException(e);
             }
-          })
-          .subscribe();
+          });
     }
     catch (IOException e)
     {
@@ -121,7 +120,7 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(
+  public Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux)
   {
@@ -136,7 +135,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      messageFlux
+      return messageFlux
           .log()
           .doFirst(() ->
           {
@@ -174,8 +173,7 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               throw new RuntimeException(e);
             }
-          })
-          .subscribe();
+          });
     }
     catch (IOException e)
     {
index 853ee1c..47596a2 100644 (file)
@@ -5,6 +5,8 @@ import lombok.*;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
+import java.util.UUID;
+
 
 @AllArgsConstructor
 @NoArgsConstructor
@@ -19,6 +21,13 @@ public class ChatRoomTo
   private String id;
   private String name;
 
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(
+        UUID.fromString(id),
+        name);
+  }
+
   public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
   {
     return new ChatRoomTo(
index 780d64b..1428119 100644 (file)
@@ -21,12 +21,12 @@ public class MongoDbStorageStrategy implements StorageStrategy
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
-    chatRoomInfoFlux
+    return chatRoomInfoFlux
         .map(ChatRoomTo::from)
         .map(chatroomTo -> chatRoomRepository.save(chatroomTo))
-        .subscribe();
+        .map(ChatRoomTo::toChatRoomInfo);
   }
 
   @Override
@@ -42,12 +42,12 @@ public class MongoDbStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+  public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
   {
-    messageFlux
+    return messageFlux
         .map(message -> MessageTo.from(chatRoomId, message))
         .map(messageTo -> messageRepository.save(messageTo))
-        .subscribe();
+        .map(MessageTo::toMessage);
   }
 
   @Override
index 1b20aa3..6215069 100644 (file)
@@ -32,7 +32,10 @@ public class NoStorageStorageConfiguration
     return new StorageStrategy()
     {
       @Override
-      public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+      public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+      {
+        return chatRoomInfoFlux;
+      }
 
       @Override
       public Flux<ChatRoomInfo> readChatRoomInfo()
@@ -41,7 +44,10 @@ public class NoStorageStorageConfiguration
       }
 
       @Override
-      public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+      public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+      {
+        return messageFlux;
+      }
 
       @Override
       public Flux<Message> readChatRoomData(UUID chatRoomId)