X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2FLocalJsonFilesStorageStrategy.java;h=6c6190876b85405efed0db054764dff18dacd701;hb=c04279dc82b8e662f7a8408ff74f7acd9951cf72;hp=a90e2ebd5576b97dc390b2e25095d31cdfb0f778;hpb=14c6a8c34de8b46b7f1655558727fc1276afb036;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java index a90e2ebd..6c619087 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java @@ -1,27 +1,19 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatroomTo; import de.juplo.kafka.chat.backend.api.MessageTo; import de.juplo.kafka.chat.backend.domain.Chatroom; -import de.juplo.kafka.chat.backend.domain.ChatroomFactory; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.util.LinkedHashMap; -import java.util.function.Function; -import java.util.stream.Collectors; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -36,7 +28,7 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy private final Path storagePath; private final ObjectMapper mapper; - private final ChatroomFactory chatroomFactory; + private final InMemoryChatroomFactory chatroomFactory; @Override @@ -102,59 +94,19 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy @Override public Flux readChatrooms() { - Path path = chatroomsPath(); - log.info("Reading chatrooms from {}", path); - try - { - JsonParser parser = - mapper - .getFactory() - .createParser(Files.newBufferedReader(path)); - - if (parser.nextToken() != JsonToken.START_ARRAY) - throw new IllegalStateException("Expected content to be an array"); - - Sinks.Many many = Sinks.many().unicast().onBackpressureBuffer(); - - while (parser.nextToken() != JsonToken.END_ARRAY) - { - many - .tryEmitNext(mapper.readValue(parser, ChatroomTo.class)) - .orThrow(); - } - - many.tryEmitComplete().orThrow(); - - return many - .asFlux() - .map(chatroomTo -> - { - LinkedHashMap messages = - readMessages(chatroomTo) - .collect(Collectors.toMap( - Message::getKey, - Function.identity(), - (existing, message) -> - { - if (!message.equals(existing)) - throw new MessageMutationException(message, existing); - return existing; - }, - LinkedHashMap::new)) - .block(); - InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages); - return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy); - }); - } - catch (NoSuchFileException e) - { - log.info("{} does not exist - starting with empty ChatHome", path); - return Flux.empty(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class); + return Flux + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .log() + .map(chatroomTo -> + { + InMemoryChatroomService chatroomService = + new InMemoryChatroomService(readMessages(chatroomTo)); + return chatroomFactory.restoreChatroom( + chatroomTo.getId(), + chatroomTo.getName(), + chatroomService); + }); } @Override @@ -219,43 +171,11 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy @Override public Flux readMessages(ChatroomTo chatroomTo) { - Path path = chatroomPath(chatroomTo); - log.info("Reading messages for {} from {}", chatroomTo, path); - try - { - JsonParser parser = - mapper - .getFactory() - .createParser(Files.newBufferedReader(path)); - - if (parser.nextToken() != JsonToken.START_ARRAY) - throw new IllegalStateException("Expected content to be an array"); - - Sinks.Many many = Sinks.many().unicast().onBackpressureBuffer(); - - while (parser.nextToken() != JsonToken.END_ARRAY) - { - many - .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage()) - .orThrow(); - } - - many.tryEmitComplete().orThrow(); - - return many.asFlux(); - } - catch (NoSuchFileException e) - { - log.info( - "{} does not exist - starting with empty chat for {}", - path, - chatroomTo); - return Flux.empty(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); + return Flux + .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) + .log() + .map(MessageTo::toMessage); } Path chatroomsPath()