X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2FLocalJsonFilesStorageStrategy.java;h=706fbe3073eae081bca47c259b5cf180e0632022;hb=42c6495004e3c95808b0889d4456d74bfdcdff70;hp=a90e2ebd5576b97dc390b2e25095d31cdfb0f778;hpb=14c6a8c34de8b46b7f1655558727fc1276afb036;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java index a90e2ebd..706fbe30 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java @@ -1,27 +1,19 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.api.ChatroomTo; +import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.api.MessageTo; -import de.juplo.kafka.chat.backend.domain.Chatroom; -import de.juplo.kafka.chat.backend.domain.ChatroomFactory; +import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.util.LinkedHashMap; -import java.util.function.Function; -import java.util.stream.Collectors; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -36,11 +28,11 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy private final Path storagePath; private final ObjectMapper mapper; - private final ChatroomFactory chatroomFactory; + private final InMemoryChatHomeService service; @Override - public void writeChatrooms(Flux chatroomFlux) + public void writeChatrooms(Flux chatroomFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -83,7 +75,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy { try { - ChatroomTo chatroomTo = ChatroomTo.from(chatroom); + ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); generator.writeObject(chatroomTo); writeMessages(chatroomTo, chatroom.getMessages()); } @@ -100,65 +92,25 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy } @Override - public Flux readChatrooms() + public Flux readChatrooms() { - Path path = chatroomsPath(); - log.info("Reading chatrooms from {}", path); - try - { - JsonParser parser = - mapper - .getFactory() - .createParser(Files.newBufferedReader(path)); - - if (parser.nextToken() != JsonToken.START_ARRAY) - throw new IllegalStateException("Expected content to be an array"); - - Sinks.Many many = Sinks.many().unicast().onBackpressureBuffer(); - - while (parser.nextToken() != JsonToken.END_ARRAY) - { - many - .tryEmitNext(mapper.readValue(parser, ChatroomTo.class)) - .orThrow(); - } - - many.tryEmitComplete().orThrow(); - - return many - .asFlux() - .map(chatroomTo -> - { - LinkedHashMap messages = - readMessages(chatroomTo) - .collect(Collectors.toMap( - Message::getKey, - Function.identity(), - (existing, message) -> - { - if (!message.equals(existing)) - throw new MessageMutationException(message, existing); - return existing; - }, - LinkedHashMap::new)) - .block(); - InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages); - return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy); - }); - } - catch (NoSuchFileException e) - { - log.info("{} does not exist - starting with empty ChatHome", path); - return Flux.empty(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); + return Flux + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .log() + .map(chatRoomTo -> + { + InMemoryChatRoomService chatroomService = + new InMemoryChatRoomService(readMessages(chatRoomTo)); + return service.restoreChatroom( + chatRoomTo.getId(), + chatRoomTo.getName(), + chatroomService); + }); } @Override - public void writeMessages(ChatroomTo chatroomTo, Flux messageFlux) + public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) { Path path = chatroomPath(chatroomTo); log.info("Writing messages for {} to {}", chatroomTo, path); @@ -217,45 +169,13 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy } @Override - public Flux readMessages(ChatroomTo chatroomTo) + public Flux readMessages(ChatRoomTo chatroomTo) { - Path path = chatroomPath(chatroomTo); - log.info("Reading messages for {} from {}", chatroomTo, path); - try - { - JsonParser parser = - mapper - .getFactory() - .createParser(Files.newBufferedReader(path)); - - if (parser.nextToken() != JsonToken.START_ARRAY) - throw new IllegalStateException("Expected content to be an array"); - - Sinks.Many many = Sinks.many().unicast().onBackpressureBuffer(); - - while (parser.nextToken() != JsonToken.END_ARRAY) - { - many - .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage()) - .orThrow(); - } - - many.tryEmitComplete().orThrow(); - - return many.asFlux(); - } - catch (NoSuchFileException e) - { - log.info( - "{} does not exist - starting with empty chat for {}", - path, - chatroomTo); - return Flux.empty(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); + return Flux + .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) + .log() + .map(MessageTo::toMessage); } Path chatroomsPath() @@ -263,7 +183,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); } - Path chatroomPath(ChatroomTo chatroomTo) + Path chatroomPath(ChatRoomTo chatroomTo) { return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json")); }