From: Kai Moritz Date: Sat, 7 Jan 2023 20:58:27 +0000 (+0100) Subject: refactor: `LocalJsonFilesStorageStrategy` is now realy reactive X-Git-Tag: wip~78 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=70ffb4f9a4bf5a3e419e0a39824f7dfb888bb8dc;p=demos%2Fkafka%2Fchat refactor: `LocalJsonFilesStorageStrategy` is now realy reactive --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index adabb922..b590854e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -23,9 +23,7 @@ public class ChatBackendConfiguration ChatroomFactory chatroomFactory, StorageStrategy storageStrategy) { - return new ChatHome( - storageStrategy.readChatrooms().collectMap(chatroom -> chatroom.getId()).block(), - chatroomFactory); + return new ChatHome(chatroomFactory, storageStrategy.readChatrooms()); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index d04bd738..ed4d8c7f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -1,17 +1,25 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; import java.util.*; import java.util.stream.Stream; -@RequiredArgsConstructor +@Slf4j public class ChatHome { private final Map chatrooms; private final ChatroomFactory factory; + public ChatHome(ChatroomFactory factory, Flux chatroomFlux) + { + log.debug("Creating ChatHome with factory: {}", factory); + this.factory = factory; + this.chatrooms = new HashMap<>(); + chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); + } public Chatroom createChatroom(String name) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java index 19ef3431..4b522a82 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java @@ -22,6 +22,12 @@ public class InMemoryPersistenceStrategy implements PersistenceStrategy this.messages = messages; } + public InMemoryPersistenceStrategy(Flux messageFlux) + { + log.debug("Creating InMemoryPersistenceStrategy"); + messages = new LinkedHashMap<>(); + messageFlux.subscribe(message -> persistMessage(message)); + } @Override public Mono persistMessage( @@ -30,20 +36,23 @@ public class InMemoryPersistenceStrategy implements PersistenceStrategy String text) { Message message = new Message(key, (long)messages.size(), timestamp, text); + return Mono.justOrEmpty(persistMessage(message)); + } + private Message persistMessage(Message message) + { + Message.MessageKey key = message.getKey(); Message existing = messages.get(key); if (existing != null) { log.info("Message with key {} already exists; {}", key, existing); if (!message.equals(existing)) throw new MessageMutationException(message, existing); - return Mono.empty(); + return null; } messages.put(key, message); - return Mono - .fromSupplier(() -> message) - .log(); + return message; } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java new file mode 100644 index 00000000..e502539e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java @@ -0,0 +1,118 @@ +package de.juplo.kafka.chat.backend.persistence; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; + + +@RequiredArgsConstructor +@Slf4j +public class JsonFilePublisher implements Publisher +{ + private final Path path; + private final ObjectMapper mapper; + private final JavaType type; + + + @Override + public void subscribe(Subscriber subscriber) + { + log.info("Reading chatrooms from {}", path); + + try + { + JsonParser parser = + mapper.getFactory().createParser(Files.newBufferedReader(path)); + + if (parser.nextToken() != JsonToken.START_ARRAY) + { + throw new IllegalStateException("Expected content to be an array"); + } + + subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser)); + } + catch (NoSuchFileException e) + { + log.info("{} does not exist - starting with empty ChatHome", path); + subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of())); + } + catch (IOException | IllegalStateException e) + { + subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e))))); + } + } + + @RequiredArgsConstructor + private class JsonFileSubscription implements Subscription + { + private final Subscriber subscriber; + private final JsonParser parser; + + @Override + public void request(long requested) + { + try + { + while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY) + { + subscriber.onNext(mapper.readValue(parser, type)); + requested--; + } + + if (requested > 0) + subscriber.onComplete(); + } + catch (IOException e) + { + subscriber.onError(e); + } + } + + @Override + public void cancel() {} + } + + private class ReplaySubscription implements Subscription + { + private final Subscriber subscriber; + private final Iterator>> iterator; + + ReplaySubscription( + Subscriber subscriber, + Iterable>> actions) + { + this.subscriber = subscriber; + this.iterator = actions.iterator(); + } + + @Override + public void request(long requested) + { + while (requested > 0 && iterator.hasNext()) + { + iterator.next().accept(subscriber); + requested--; + } + + if (requested > 0) + subscriber.onComplete(); + } + + @Override + public void cancel() {} + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java index a90e2ebd..c1958936 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java @@ -1,27 +1,20 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatroomTo; import de.juplo.kafka.chat.backend.api.MessageTo; import de.juplo.kafka.chat.backend.domain.Chatroom; import de.juplo.kafka.chat.backend.domain.ChatroomFactory; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.util.LinkedHashMap; -import java.util.function.Function; -import java.util.stream.Collectors; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -102,59 +95,16 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy @Override public Flux readChatrooms() { - Path path = chatroomsPath(); - log.info("Reading chatrooms from {}", path); - try - { - JsonParser parser = - mapper - .getFactory() - .createParser(Files.newBufferedReader(path)); - - if (parser.nextToken() != JsonToken.START_ARRAY) - throw new IllegalStateException("Expected content to be an array"); - - Sinks.Many many = Sinks.many().unicast().onBackpressureBuffer(); - - while (parser.nextToken() != JsonToken.END_ARRAY) - { - many - .tryEmitNext(mapper.readValue(parser, ChatroomTo.class)) - .orThrow(); - } - - many.tryEmitComplete().orThrow(); - - return many - .asFlux() - .map(chatroomTo -> - { - LinkedHashMap messages = - readMessages(chatroomTo) - .collect(Collectors.toMap( - Message::getKey, - Function.identity(), - (existing, message) -> - { - if (!message.equals(existing)) - throw new MessageMutationException(message, existing); - return existing; - }, - LinkedHashMap::new)) - .block(); - InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages); - return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy); - }); - } - catch (NoSuchFileException e) - { - log.info("{} does not exist - starting with empty ChatHome", path); - return Flux.empty(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class); + return Flux + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .log() + .map(chatroomTo -> + { + InMemoryPersistenceStrategy strategy = + new InMemoryPersistenceStrategy(readMessages(chatroomTo)); + return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy); + }); } @Override @@ -219,43 +169,11 @@ public class LocalJsonFilesStorageStrategy implements StorageStrategy @Override public Flux readMessages(ChatroomTo chatroomTo) { - Path path = chatroomPath(chatroomTo); - log.info("Reading messages for {} from {}", chatroomTo, path); - try - { - JsonParser parser = - mapper - .getFactory() - .createParser(Files.newBufferedReader(path)); - - if (parser.nextToken() != JsonToken.START_ARRAY) - throw new IllegalStateException("Expected content to be an array"); - - Sinks.Many many = Sinks.many().unicast().onBackpressureBuffer(); - - while (parser.nextToken() != JsonToken.END_ARRAY) - { - many - .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage()) - .orThrow(); - } - - many.tryEmitComplete().orThrow(); - - return many.asFlux(); - } - catch (NoSuchFileException e) - { - log.info( - "{} does not exist - starting with empty chat for {}", - path, - chatroomTo); - return Flux.empty(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); + return Flux + .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) + .log() + .map(MessageTo::toMessage); } Path chatroomsPath()