X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Ffiles%2FFilesStorageStrategy.java;h=aaa615986075f1dc627e38b13ffdb00e239a9d4e;hb=2800b80bb0a02feb70ab9d0c804b2fcc9915f479;hp=7e04a9649b13ba43a654778ece1e362f504381c5;hpb=c786c4a079da27f54e75d22e2d2c2a1693aba078;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index 7e04a964..aaa61598 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.UUID; +import java.util.logging.Level; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -32,10 +33,12 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final ShardingStrategy shardingStrategy; private final ObjectMapper mapper; + private final Level loggingLevel; + private final boolean showOperatorLine; @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -48,8 +51,11 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatRoomInfoFlux - .log() + return chatRoomInfoFlux + .log( + FilesStorageStrategy.class.getSimpleName(), + loggingLevel, + showOperatorLine) .doFirst(() -> { try @@ -86,8 +92,7 @@ public class FilesStorageStrategy implements StorageStrategy { throw new RuntimeException(e); } - }) - .subscribe(); + }); } catch (IOException e) { @@ -101,7 +106,10 @@ public class FilesStorageStrategy implements StorageStrategy JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) - .log() + .log( + FilesStorageStrategy.class.getSimpleName(), + loggingLevel, + showOperatorLine) .map(chatRoomInfoTo -> { UUID chatRoomId = chatRoomInfoTo.getId(); @@ -121,7 +129,7 @@ public class FilesStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData( + public Flux writeChatRoomData( UUID chatRoomId, Flux messageFlux) { @@ -136,8 +144,11 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - messageFlux - .log() + return messageFlux + .log( + FilesStorageStrategy.class.getSimpleName(), + loggingLevel, + showOperatorLine) .doFirst(() -> { try @@ -174,8 +185,7 @@ public class FilesStorageStrategy implements StorageStrategy { throw new RuntimeException(e); } - }) - .subscribe(); + }); } catch (IOException e) { @@ -189,7 +199,10 @@ public class FilesStorageStrategy implements StorageStrategy JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) - .log() + .log( + FilesStorageStrategy.class.getSimpleName(), + loggingLevel, + showOperatorLine) .map(MessageTo::toMessage); }