WIP - Ein Versuch (vielleicht Unsinn)
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / StorageStrategy.java
index 98db975..6d39956 100644 (file)
@@ -33,7 +33,7 @@ public interface StorageStrategy
     Phaser writtenChatRooms = new Phaser(1);
     AtomicInteger numErrors = new AtomicInteger();
 
-    writeChatRoomInfo(
+    Flux<ChatRoomInfo> chatRoomInfoFlux =
         chatHomeService
             .getChatRoomInfo()
             .doOnNext(chatRoomInfo -> writtenChatRooms.register())
@@ -52,7 +52,9 @@ public interface StorageStrategy
                   logFailure(chatRoomId, throwable);
                   numErrors.incrementAndGet();
                   writtenChatRooms.arriveAndDeregister();
-                })));
+                }));
+
+    writeChatRoomInfo(chatRoomInfoFlux);
 
     writtenChatRooms.arriveAndAwaitAdvance();
     if (numErrors.get() > 0)