X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Ffiles%2FFilesStorageStrategy.java;h=7c1fef02f825f68a87050a0e427f3136050b607d;hb=7107fd0b4795d95b6c7f49cf4e765b82c16d016e;hp=cb7dd31e27364f564535452ed3f63de932b3112e;hpb=386f950f8b329bf2b956fa7896e270a39037967d;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index cb7dd31e..7c1fef02 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.UUID; +import java.util.logging.Level; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -32,10 +33,13 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final ShardingStrategy shardingStrategy; private final ObjectMapper mapper; + private final String loggingCategory = FilesStorageStrategy.class.getSimpleName(); + private final Level loggingLevel; + private final boolean showOperatorLine; @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -48,8 +52,11 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatRoomInfoFlux - .log() + return chatRoomInfoFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .doFirst(() -> { try @@ -74,12 +81,13 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }) - .subscribe(chatRoomInfo -> + .map(chatRoomInfo -> { try { ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); generator.writeObject(chatRoomInfoTo); + return chatRoomInfo; } catch (IOException e) { @@ -99,7 +107,10 @@ public class FilesStorageStrategy implements StorageStrategy JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) - .log() + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(chatRoomInfoTo -> { UUID chatRoomId = chatRoomInfoTo.getId(); @@ -119,7 +130,7 @@ public class FilesStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData( + public Flux writeChatRoomData( UUID chatRoomId, Flux messageFlux) { @@ -134,8 +145,11 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - messageFlux - .log() + return messageFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .doFirst(() -> { try @@ -160,12 +174,13 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }) - .subscribe(message -> + .map(message -> { try { MessageTo messageTo = MessageTo.from(message); generator.writeObject(messageTo); + return message; } catch (IOException e) { @@ -185,7 +200,10 @@ public class FilesStorageStrategy implements StorageStrategy JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) - .log() + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(MessageTo::toMessage); }