X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Ffiles%2FFilesStorageStrategy.java;h=7c1fef02f825f68a87050a0e427f3136050b607d;hb=7107fd0b4795d95b6c7f49cf4e765b82c16d016e;hp=887aea24272e0fa8ba88c2e684a891da5edda028;hpb=9ac32a888a2a03e7d40679226213c3b3b67d586e;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index 887aea24..7c1fef02 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -7,8 +7,8 @@ import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import de.juplo.kafka.chat.backend.api.MessageTo; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import de.juplo.kafka.chat.backend.implementation.StorageStrategy; +import de.juplo.kafka.chat.backend.implementation.ShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -17,6 +17,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.UUID; +import java.util.logging.Level; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -32,10 +33,13 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final ShardingStrategy shardingStrategy; private final ObjectMapper mapper; + private final String loggingCategory = FilesStorageStrategy.class.getSimpleName(); + private final Level loggingLevel; + private final boolean showOperatorLine; @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) + public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -48,8 +52,11 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatRoomInfoFlux - .log() + return chatRoomInfoFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .doFirst(() -> { try @@ -74,12 +81,13 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }) - .subscribe(chatRoomInfo -> + .map(chatRoomInfo -> { try { ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); generator.writeObject(chatRoomInfoTo); + return chatRoomInfo; } catch (IOException e) { @@ -99,7 +107,10 @@ public class FilesStorageStrategy implements StorageStrategy JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) - .log() + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(chatRoomInfoTo -> { UUID chatRoomId = chatRoomInfoTo.getId(); @@ -119,7 +130,7 @@ public class FilesStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData( + public Flux writeChatRoomData( UUID chatRoomId, Flux messageFlux) { @@ -134,8 +145,11 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - messageFlux - .log() + return messageFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .doFirst(() -> { try @@ -160,12 +174,13 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }) - .subscribe(message -> + .map(message -> { try { MessageTo messageTo = MessageTo.from(message); generator.writeObject(messageTo); + return message; } catch (IOException e) { @@ -185,7 +200,10 @@ public class FilesStorageStrategy implements StorageStrategy JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) - .log() + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(MessageTo::toMessage); }