From aa32b5526095ab031def88d0ef5938322c379f06 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Mon, 19 Feb 2024 15:01:58 +0100
Subject: [PATCH] WIP:mongodb map vs subscribe - subscribe rausgezogen

---
 .../chat/backend/ChatBackendApplication.java  |  4 +++-
 .../backend/api/ChatBackendController.java    |  2 +-
 .../implementation/StorageStrategy.java       | 19 ++++++++++---------
 .../storage/files/FilesStorageStrategy.java   | 14 ++++++--------
 .../backend/storage/mongodb/ChatRoomTo.java   |  9 +++++++++
 .../mongodb/MongoDbStorageStrategy.java       | 12 ++++++------
 .../nostorage/NoStorageStorageStrategy.java   | 19 +++++++++++++------
 .../backend/AbstractStorageStrategyIT.java    |  4 +++-
 8 files changed, 51 insertions(+), 32 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
index 1eaa88c7..76debbed 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
@@ -32,7 +32,9 @@ public class ChatBackendApplication implements WebFluxConfigurer
 	@PreDestroy
 	public void onExit()
 	{
-		storageStrategy.write(chatHomeService);
+		storageStrategy
+				.write(chatHomeService)
+				.subscribe();
 	}
 
 	public static void main(String[] args)
diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
index f3efe791..acb84e64 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
@@ -137,6 +137,6 @@ public class ChatBackendController
   @PostMapping("/store")
   public void store()
   {
-    storageStrategy.write(chatHomeService);
+    storageStrategy.write(chatHomeService).subscribe();
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
index 019db657..990d001a 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
@@ -16,20 +16,20 @@ public interface StorageStrategy
 {
   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
 
-  default void write(ChatHomeService chatHomeService)
+  default Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
   {
-    write(
+    return write(
         chatHomeService,
         this::logSuccessChatHomeService,
         this::logFailureChatHomeService);
   }
 
-  default void write(
+  default Flux<ChatRoomInfo> write(
       ChatHomeService chatHomeService,
       ChatHomeServiceWrittenSuccessCallback successCallback,
       ChatHomeServiceWrittenFailureCallback failureCallback)
   {
-    writeChatRoomInfo(
+    return writeChatRoomInfo(
         chatHomeService
             .getChatRoomInfo()
             .doOnComplete(() -> successCallback.accept(chatHomeService))
@@ -40,25 +40,26 @@ public interface StorageStrategy
                     chatHomeService
                         .getChatRoomData(chatRoomInfo.getId())
                         .flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+
                     this::logSuccessChatRoom,
-                    this::logFailureChatRoom)));
+                    this::logFailureChatRoom).subscribe()));
   }
 
-  void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+  Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
   Flux<ChatRoomInfo> readChatRoomInfo();
-  default void writeChatRoomData(
+  default Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux,
       ChatRoomWrittenSuccessCallback successCallback,
       ChatRoomWrittenFailureCallback failureCallback)
   {
-    writeChatRoomData(
+    return writeChatRoomData(
         chatRoomId,
         messageFlux
             .doOnComplete(() -> successCallback.accept(chatRoomId))
             .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
   }
-  void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+  Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
   Flux<Message> readChatRoomData(UUID chatRoomId);
 
   interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
index 7e04a964..cdb4f0d7 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
@@ -35,7 +35,7 @@ public class FilesStorageStrategy implements StorageStrategy
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
     Path path = chatroomsPath();
     log.info("Writing chatrooms to {}", path);
@@ -48,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      chatRoomInfoFlux
+      return chatRoomInfoFlux
           .log()
           .doFirst(() ->
           {
@@ -86,8 +86,7 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               throw new RuntimeException(e);
             }
-          })
-          .subscribe();
+          });
     }
     catch (IOException e)
     {
@@ -121,7 +120,7 @@ public class FilesStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(
+  public Flux<Message> writeChatRoomData(
       UUID chatRoomId,
       Flux<Message> messageFlux)
   {
@@ -136,7 +135,7 @@ public class FilesStorageStrategy implements StorageStrategy
               .getFactory()
               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
 
-      messageFlux
+      return messageFlux
           .log()
           .doFirst(() ->
           {
@@ -174,8 +173,7 @@ public class FilesStorageStrategy implements StorageStrategy
             {
               throw new RuntimeException(e);
             }
-          })
-          .subscribe();
+          });
     }
     catch (IOException e)
     {
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
index 853ee1cf..47596a2e 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
@@ -5,6 +5,8 @@ import lombok.*;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
+import java.util.UUID;
+
 
 @AllArgsConstructor
 @NoArgsConstructor
@@ -19,6 +21,13 @@ public class ChatRoomTo
   private String id;
   private String name;
 
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(
+        UUID.fromString(id),
+        name);
+  }
+
   public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
   {
     return new ChatRoomTo(
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
index 780d64be..1428119e 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
@@ -21,12 +21,12 @@ public class MongoDbStorageStrategy implements StorageStrategy
 
 
   @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
   {
-    chatRoomInfoFlux
+    return chatRoomInfoFlux
         .map(ChatRoomTo::from)
         .map(chatroomTo -> chatRoomRepository.save(chatroomTo))
-        .subscribe();
+        .map(ChatRoomTo::toChatRoomInfo);
   }
 
   @Override
@@ -42,12 +42,12 @@ public class MongoDbStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+  public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
   {
-    messageFlux
+    return messageFlux
         .map(message -> MessageTo.from(chatRoomId, message))
         .map(messageTo -> messageRepository.save(messageTo))
-        .subscribe();
+        .map(MessageTo::toMessage);
   }
 
   @Override
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
index 6ca08e22..59027424 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
@@ -13,14 +13,18 @@ import java.util.UUID;
 @Slf4j
 public class NoStorageStorageStrategy implements StorageStrategy
 {
-  @Override
-  public void write(ChatHomeService chatHomeService)
+  public Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
   {
-    log.info("Storage is disabled: Not storing {}", chatHomeService);
+    return Flux
+        .<ChatRoomInfo>empty()
+        .doOnComplete(() -> log.info("Storage is disabled: Not storing {}", chatHomeService));
+
   }
 
-  @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+  public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  {
+    return chatRoomInfoFlux;
+  }
 
   @Override
   public Flux<ChatRoomInfo> readChatRoomInfo()
@@ -29,7 +33,10 @@ public class NoStorageStorageStrategy implements StorageStrategy
   }
 
   @Override
-  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+  public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+  {
+    return messageFlux;
+  }
 
   @Override
   public Flux<Message> readChatRoomData(UUID chatRoomId)
diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
index 5eaf5417..41e80ed7 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
@@ -28,7 +28,9 @@ public abstract class AbstractStorageStrategyIT
 
   protected void stop()
   {
-    getStorageStrategy().write(chathome);
+    getStorageStrategy()
+        .write(chathome)
+        .subscribe();
   }
 
   @Test
-- 
2.20.1