From: Kai Moritz Date: Fri, 9 Feb 2024 16:50:26 +0000 (+0100) Subject: WIP - Ein Versuch (vielleicht Unsinn) X-Git-Tag: rebase--2024-02-18--18-12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=46cafb65876ccec33ef4e9948fad2e4aa526039a;p=demos%2Fkafka%2Fchat WIP - Ein Versuch (vielleicht Unsinn) --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index 98db9750..6d39956b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -33,7 +33,7 @@ public interface StorageStrategy Phaser writtenChatRooms = new Phaser(1); AtomicInteger numErrors = new AtomicInteger(); - writeChatRoomInfo( + Flux chatRoomInfoFlux = chatHomeService .getChatRoomInfo() .doOnNext(chatRoomInfo -> writtenChatRooms.register()) @@ -52,7 +52,9 @@ public interface StorageStrategy logFailure(chatRoomId, throwable); numErrors.incrementAndGet(); writtenChatRooms.arriveAndDeregister(); - }))); + })); + + writeChatRoomInfo(chatRoomInfoFlux); writtenChatRooms.arriveAndAwaitAdvance(); if (numErrors.get() > 0) diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index cb7dd31e..00bb881d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -62,24 +62,24 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }) - .doOnTerminate(() -> + .doOnNext(chatRoomInfo -> { try { - generator.writeEndArray(); - generator.close(); + ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); + generator.writeObject(chatRoomInfoTo); } catch (IOException e) { throw new RuntimeException(e); } }) - .subscribe(chatRoomInfo -> + .doOnTerminate(() -> { try { - ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); - generator.writeObject(chatRoomInfoTo); + generator.writeEndArray(); + generator.close(); } catch (IOException e) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 3f9ff209..1d742a5d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -25,8 +25,7 @@ public class MongoDbStorageStrategy implements StorageStrategy { chatRoomInfoFlux .map(ChatRoomTo::from) - .flatMap(chatroomTo -> chatRoomRepository.save(chatroomTo)) - .subscribe(); + .flatMap(chatroomTo -> chatRoomRepository.save(chatroomTo)); } @Override