WIP:mongodb map vs subscribe - subscribe rausgezogen
authorKai Moritz <kai@juplo.de>
Mon, 19 Feb 2024 14:01:58 +0000 (15:01 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 19 Feb 2024 14:01:58 +0000 (15:01 +0100)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java

index 1eaa88c..76debbe 100644 (file)
@@ -32,7 +32,9 @@ public class ChatBackendApplication implements WebFluxConfigurer
        @PreDestroy
        public void onExit()
        {
-               storageStrategy.write(chatHomeService);
+               storageStrategy
+                               .write(chatHomeService)
+                               .subscribe();
        }
 
        public static void main(String[] args)
index f3efe79..acb84e6 100644 (file)
@@ -137,6 +137,6 @@ public class ChatBackendController
   @PostMapping("/store")
   public void store()
   {
-    storageStrategy.write(chatHomeService);
+    storageStrategy.write(chatHomeService).subscribe();
   }
 }
index 019db65..990d001 100644 (file)
@@ -16,20 +16,20 @@ public interface StorageStrategy
 {
   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
 
-  default void write(ChatHomeService chatHomeService)
+  default Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
   {
-    write(
+    return write(
         chatHomeService,
         this::logSuccessChatHomeService,
         this::logFailureChatHomeService);
   }
 
-  default void write(
+  default Flux<ChatRoomInfo> write(
       ChatHomeService chatHomeService,
       ChatHomeServiceWrittenSuccessCallback successCallback,
       ChatHomeServiceWrittenFailureCallback failureCallback)
   {
-    writeChatRoomInfo(
+    return writeChatRoomInfo(
         chatHomeService
             .getChatRoomInfo()
             .doOnComplete(() -> successCallback.accept(chatHomeService))
@@ -40,25 +40,26 @@ public interface StorageStrategy
                     chatHomeService
                         .getChatRoomData(chatRoomInfo.getId())
                         .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+
                     this::logSuccessChatRoom,
-                    this::logFailureChatRoom)));
+                    this::logFailureChatRoom).subscribe()));
   }
 
-  void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+  Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
   Flux<ChatRoomInfo> readChatRoomInfo();
-  default void writeChatRoomData(
+  default Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux,
       ChatRoomWrittenSuccessCallback successCallback,
       ChatRoomWrittenFailureCallback failureCallback)
   {
-    writeChatRoomData(
+    return writeChatRoomData(
         chatRoomId,
         messageFlux
             .doOnComplete(() -> successCallback.accept(chatRoomId))
             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
   }
-  void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+  Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
   Flux<Message> readChatRoomData(UUID chatRoomId);
 
   interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
index 7e04a96..cdb4f0d 100644 (file)
@@ -35,7 +35,7 @@ public class FilesStorageStrategy implements StorageStrategy
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -48,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatRoomInfoFlux
+      return chatRoomInfoFlux
           .log()
           .doFirst(() ->
           {
@@ -86,8 +86,7 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               throw new RuntimeException(e);
             }
-          })
-          .subscribe();
+          });
     }
     catch (IOException e)
     {
@@ -121,7 +120,7 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(
+  public Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux)
   {
@@ -136,7 +135,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      messageFlux
+      return messageFlux
           .log()
           .doFirst(() ->
           {
@@ -174,8 +173,7 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               throw new RuntimeException(e);
             }
-          })
-          .subscribe();
+          });
     }
     catch (IOException e)
     {
index 853ee1c..47596a2 100644 (file)
@@ -5,6 +5,8 @@ import lombok.*;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
+import java.util.UUID;
+
 
 @AllArgsConstructor
 @NoArgsConstructor
@@ -19,6 +21,13 @@ public class ChatRoomTo
   private String id;
   private String name;
 
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(
+        UUID.fromString(id),
+        name);
+  }
+
   public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
   {
     return new ChatRoomTo(
index 780d64b..1428119 100644 (file)
@@ -21,12 +21,12 @@ public class MongoDbStorageStrategy implements StorageStrategy
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
-    chatRoomInfoFlux
+    return chatRoomInfoFlux
         .map(ChatRoomTo::from)
         .map(chatroomTo -> chatRoomRepository.save(chatroomTo))
-        .subscribe();
+        .map(ChatRoomTo::toChatRoomInfo);
   }
 
   @Override
@@ -42,12 +42,12 @@ public class MongoDbStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+  public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
   {
-    messageFlux
+    return messageFlux
         .map(message -> MessageTo.from(chatRoomId, message))
         .map(messageTo -> messageRepository.save(messageTo))
-        .subscribe();
+        .map(MessageTo::toMessage);
   }
 
   @Override
index 6ca08e2..5902742 100644 (file)
@@ -13,14 +13,18 @@ import java.util.UUID;
 @Slf4j
 public class NoStorageStorageStrategy implements StorageStrategy
 {
-  @Override
-  public void write(ChatHomeService chatHomeService)
+  public Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
   {
-    log.info("Storage is disabled: Not storing {}", chatHomeService);
+    return Flux
+        .<ChatRoomInfo>empty()
+        .doOnComplete(() -> log.info("Storage is disabled: Not storing {}", chatHomeService));
+
   }
 
-  @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  {
+    return chatRoomInfoFlux;
+  }
 
   @Override
   public Flux<ChatRoomInfo> readChatRoomInfo()
@@ -29,7 +33,10 @@ public class NoStorageStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+  public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+  {
+    return messageFlux;
+  }
 
   @Override
   public Flux<Message> readChatRoomData(UUID chatRoomId)
index 5eaf541..41e80ed 100644 (file)
@@ -28,7 +28,9 @@ public abstract class AbstractStorageStrategyIT
 
   protected void stop()
   {
-    getStorageStrategy().write(chathome);
+    getStorageStrategy()
+        .write(chathome)
+        .subscribe();
   }
 
   @Test