WIP - Ein Versuch (vielleicht Unsinn) rebase--2024-02-18--18-12
authorKai Moritz <kai@juplo.de>
Fri, 9 Feb 2024 16:50:26 +0000 (17:50 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 18 Feb 2024 16:41:00 +0000 (17:41 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java

index 98db975..6d39956 100644 (file)
@@ -33,7 +33,7 @@ public interface StorageStrategy
     Phaser writtenChatRooms = new Phaser(1);
     AtomicInteger numErrors = new AtomicInteger();
 
-    writeChatRoomInfo(
+    Flux<ChatRoomInfo> chatRoomInfoFlux =
         chatHomeService
             .getChatRoomInfo()
             .doOnNext(chatRoomInfo -> writtenChatRooms.register())
@@ -52,7 +52,9 @@ public interface StorageStrategy
                   logFailure(chatRoomId, throwable);
                   numErrors.incrementAndGet();
                   writtenChatRooms.arriveAndDeregister();
-                })));
+                }));
+
+    writeChatRoomInfo(chatRoomInfoFlux);
 
     writtenChatRooms.arriveAndAwaitAdvance();
     if (numErrors.get() > 0)
index cb7dd31..00bb881 100644 (file)
@@ -62,24 +62,24 @@ public class FilesStorageStrategy implements StorageStrategy
               throw new RuntimeException(e);
             }
           })
-          .doOnTerminate(() ->
+          .doOnNext(chatRoomInfo ->
           {
             try
             {
-              generator.writeEndArray();
-              generator.close();
+              ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+              generator.writeObject(chatRoomInfoTo);
             }
             catch (IOException e)
             {
               throw new RuntimeException(e);
             }
           })
-          .subscribe(chatRoomInfo ->
+          .doOnTerminate(() ->
           {
             try
             {
-              ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
-              generator.writeObject(chatRoomInfoTo);
+              generator.writeEndArray();
+              generator.close();
             }
             catch (IOException e)
             {
index 3f9ff20..1d742a5 100644 (file)
@@ -25,8 +25,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
   {
     chatRoomInfoFlux
         .map(ChatRoomTo::from)
-        .flatMap(chatroomTo -> chatRoomRepository.save(chatroomTo))
-        .subscribe();
+        .flatMap(chatroomTo -> chatRoomRepository.save(chatroomTo));
   }
 
   @Override