X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Ffiles%2FFilesStorageStrategy.java;h=cdb4f0d720b8c12aed7a4bf6db7df159e739d024;hb=1ce142cb9566a9ab5eacb3d1da7f414722e994e9;hp=7e04a9649b13ba43a654778ece1e362f504381c5;hpb=c786c4a079da27f54e75d22e2d2c2a1693aba078;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index 7e04a964..cdb4f0d7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -35,7 +35,7 @@ public class FilesStorageStrategy implements StorageStrategy @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -48,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatRoomInfoFlux + return chatRoomInfoFlux .log() .doFirst(() -> { @@ -86,8 +86,7 @@ public class FilesStorageStrategy implements StorageStrategy { throw new RuntimeException(e); } - }) - .subscribe(); + }); } catch (IOException e) { @@ -121,7 +120,7 @@ public class FilesStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData( + public Flux writeChatRoomData( UUID chatRoomId, Flux messageFlux) { @@ -136,7 +135,7 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - messageFlux + return messageFlux .log() .doFirst(() -> { @@ -174,8 +173,7 @@ public class FilesStorageStrategy implements StorageStrategy { throw new RuntimeException(e); } - }) - .subscribe(); + }); } catch (IOException e) {