From: Kai Moritz Date: Mon, 9 Jan 2023 21:42:11 +0000 (+0100) Subject: refactor: Moved `FilesStorageStrategy` in its own package -- Move X-Git-Tag: wip-sharding~46 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=bc6e23259c4a822a961b00f221baf5f6018d73a6;p=demos%2Fkafka%2Fchat refactor: Moved `FilesStorageStrategy` in its own package -- Move --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/ChatRoomServiceFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/ChatRoomServiceFactory.java deleted file mode 100644 index 42c000bb..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/ChatRoomServiceFactory.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.filestorage; - -import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import de.juplo.kafka.chat.backend.domain.Message; -import reactor.core.publisher.Flux; - - -public interface ChatRoomServiceFactory -{ - ChatRoomService create(Flux messageFlux); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java deleted file mode 100644 index 9952117b..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java +++ /dev/null @@ -1,191 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.filestorage; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.api.ChatRoomTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Clock; - -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; - - -@RequiredArgsConstructor -@Slf4j -public class FileStorageStrategy implements StorageStrategy -{ - public static final String CHATROOMS_FILENAME = "chatrooms.json"; - - - private final Path storagePath; - private final Clock clock; - private final int bufferSize; - private final ChatRoomServiceFactory factory; - private final ObjectMapper mapper; - - - @Override - public void writeChatrooms(Flux chatroomFlux) - { - Path path = chatroomsPath(); - log.info("Writing chatrooms to {}", path); - try - { - Files.createDirectories(storagePath); - - JsonGenerator generator = - mapper - .getFactory() - .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - - chatroomFlux - .log() - .doFirst(() -> - { - try - { - generator.useDefaultPrettyPrinter(); - generator.writeStartArray(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .doOnTerminate(() -> - { - try - { - generator.writeEndArray(); - generator.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .subscribe(chatroom -> - { - try - { - ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); - generator.writeObject(chatroomTo); - writeMessages(chatroomTo, chatroom.getMessages()); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - @Override - public Flux readChatrooms() - { - JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); - return Flux - .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) - .log() - .map(chatRoomTo -> new ChatRoom( - chatRoomTo.getId(), - chatRoomTo.getName(), - clock, - factory.create(readMessages(chatRoomTo)), - bufferSize)); - } - - @Override - public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) - { - Path path = chatroomPath(chatroomTo); - log.info("Writing messages for {} to {}", chatroomTo, path); - try - { - Files.createDirectories(storagePath); - - JsonGenerator generator = - mapper - .getFactory() - .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - - messageFlux - .log() - .doFirst(() -> - { - try - { - generator.useDefaultPrettyPrinter(); - generator.writeStartArray(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .doOnTerminate(() -> - { - try - { - generator.writeEndArray(); - generator.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .subscribe(message -> - { - try - { - MessageTo messageTo = MessageTo.from(message); - generator.writeObject(messageTo); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - @Override - public Flux readMessages(ChatRoomTo chatroomTo) - { - JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); - return Flux - .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) - .log() - .map(MessageTo::toMessage); - } - - Path chatroomsPath() - { - return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); - } - - Path chatroomPath(ChatRoomTo chatroomTo) - { - return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json")); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/JsonFilePublisher.java deleted file mode 100644 index 3824898f..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/JsonFilePublisher.java +++ /dev/null @@ -1,118 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.filestorage; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Path; -import java.util.Iterator; -import java.util.List; -import java.util.function.Consumer; - - -@RequiredArgsConstructor -@Slf4j -public class JsonFilePublisher implements Publisher -{ - private final Path path; - private final ObjectMapper mapper; - private final JavaType type; - - - @Override - public void subscribe(Subscriber subscriber) - { - log.info("Reading chatrooms from {}", path); - - try - { - JsonParser parser = - mapper.getFactory().createParser(Files.newBufferedReader(path)); - - if (parser.nextToken() != JsonToken.START_ARRAY) - { - throw new IllegalStateException("Expected content to be an array"); - } - - subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser)); - } - catch (NoSuchFileException e) - { - log.info("{} does not exist - starting with empty ChatHome", path); - subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of())); - } - catch (IOException | IllegalStateException e) - { - subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e))))); - } - } - - @RequiredArgsConstructor - private class JsonFileSubscription implements Subscription - { - private final Subscriber subscriber; - private final JsonParser parser; - - @Override - public void request(long requested) - { - try - { - while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY) - { - subscriber.onNext(mapper.readValue(parser, type)); - requested--; - } - - if (requested > 0) - subscriber.onComplete(); - } - catch (IOException e) - { - subscriber.onError(e); - } - } - - @Override - public void cancel() {} - } - - private class ReplaySubscription implements Subscription - { - private final Subscriber subscriber; - private final Iterator>> iterator; - - ReplaySubscription( - Subscriber subscriber, - Iterable>> actions) - { - this.subscriber = subscriber; - this.iterator = actions.iterator(); - } - - @Override - public void request(long requested) - { - while (requested > 0 && iterator.hasNext()) - { - iterator.next().accept(subscriber); - requested--; - } - - if (requested > 0) - subscriber.onComplete(); - } - - @Override - public void cancel() {} - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java new file mode 100644 index 00000000..42c000bb --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.chat.backend.persistence.filestorage; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message; +import reactor.core.publisher.Flux; + + +public interface ChatRoomServiceFactory +{ + ChatRoomService create(Flux messageFlux); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java new file mode 100644 index 00000000..9952117b --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -0,0 +1,191 @@ +package de.juplo.kafka.chat.backend.persistence.filestorage; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.api.ChatRoomTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Clock; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; + + +@RequiredArgsConstructor +@Slf4j +public class FileStorageStrategy implements StorageStrategy +{ + public static final String CHATROOMS_FILENAME = "chatrooms.json"; + + + private final Path storagePath; + private final Clock clock; + private final int bufferSize; + private final ChatRoomServiceFactory factory; + private final ObjectMapper mapper; + + + @Override + public void writeChatrooms(Flux chatroomFlux) + { + Path path = chatroomsPath(); + log.info("Writing chatrooms to {}", path); + try + { + Files.createDirectories(storagePath); + + JsonGenerator generator = + mapper + .getFactory() + .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); + + chatroomFlux + .log() + .doFirst(() -> + { + try + { + generator.useDefaultPrettyPrinter(); + generator.writeStartArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .doOnTerminate(() -> + { + try + { + generator.writeEndArray(); + generator.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .subscribe(chatroom -> + { + try + { + ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); + generator.writeObject(chatroomTo); + writeMessages(chatroomTo, chatroom.getMessages()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + public Flux readChatrooms() + { + JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); + return Flux + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .log() + .map(chatRoomTo -> new ChatRoom( + chatRoomTo.getId(), + chatRoomTo.getName(), + clock, + factory.create(readMessages(chatRoomTo)), + bufferSize)); + } + + @Override + public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) + { + Path path = chatroomPath(chatroomTo); + log.info("Writing messages for {} to {}", chatroomTo, path); + try + { + Files.createDirectories(storagePath); + + JsonGenerator generator = + mapper + .getFactory() + .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); + + messageFlux + .log() + .doFirst(() -> + { + try + { + generator.useDefaultPrettyPrinter(); + generator.writeStartArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .doOnTerminate(() -> + { + try + { + generator.writeEndArray(); + generator.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .subscribe(message -> + { + try + { + MessageTo messageTo = MessageTo.from(message); + generator.writeObject(messageTo); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + public Flux readMessages(ChatRoomTo chatroomTo) + { + JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); + return Flux + .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) + .log() + .map(MessageTo::toMessage); + } + + Path chatroomsPath() + { + return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); + } + + Path chatroomPath(ChatRoomTo chatroomTo) + { + return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json")); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java new file mode 100644 index 00000000..3824898f --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java @@ -0,0 +1,118 @@ +package de.juplo.kafka.chat.backend.persistence.filestorage; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; + + +@RequiredArgsConstructor +@Slf4j +public class JsonFilePublisher implements Publisher +{ + private final Path path; + private final ObjectMapper mapper; + private final JavaType type; + + + @Override + public void subscribe(Subscriber subscriber) + { + log.info("Reading chatrooms from {}", path); + + try + { + JsonParser parser = + mapper.getFactory().createParser(Files.newBufferedReader(path)); + + if (parser.nextToken() != JsonToken.START_ARRAY) + { + throw new IllegalStateException("Expected content to be an array"); + } + + subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser)); + } + catch (NoSuchFileException e) + { + log.info("{} does not exist - starting with empty ChatHome", path); + subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of())); + } + catch (IOException | IllegalStateException e) + { + subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e))))); + } + } + + @RequiredArgsConstructor + private class JsonFileSubscription implements Subscription + { + private final Subscriber subscriber; + private final JsonParser parser; + + @Override + public void request(long requested) + { + try + { + while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY) + { + subscriber.onNext(mapper.readValue(parser, type)); + requested--; + } + + if (requested > 0) + subscriber.onComplete(); + } + catch (IOException e) + { + subscriber.onError(e); + } + } + + @Override + public void cancel() {} + } + + private class ReplaySubscription implements Subscription + { + private final Subscriber subscriber; + private final Iterator>> iterator; + + ReplaySubscription( + Subscriber subscriber, + Iterable>> actions) + { + this.subscriber = subscriber; + this.iterator = actions.iterator(); + } + + @Override + public void request(long requested) + { + while (requested > 0 && iterator.hasNext()) + { + iterator.next().accept(subscriber); + requested--; + } + + if (requested > 0) + subscriber.onComplete(); + } + + @Override + public void cancel() {} + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java deleted file mode 100644 index e10aae03..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java +++ /dev/null @@ -1,84 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeEach; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Clock; -import java.util.function.Supplier; - - -@Slf4j -public class InMemoryWithFileStorageStrategyIT extends AbstractStorageStrategyIT -{ - final static Path path = Paths.get("target","local-json-files"); - - final Clock clock; - final ObjectMapper mapper; - final FileStorageStrategy storageStrategy; - - - public InMemoryWithFileStorageStrategyIT() - { - clock = Clock.systemDefaultZone(); - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - storageStrategy = new FileStorageStrategy( - path, - clock, - 8, - messageFlux -> new InMemoryChatRoomService(messageFlux), - mapper); - } - - - @Override - protected StorageStrategy getStorageStrategy() - { - return storageStrategy; - } - - @Override - protected Supplier chatHomeServiceSupplier() - { - return () -> new InMemoryChatHomeService(getStorageStrategy().readChatrooms(), clock, 8); - } - - @BeforeEach - void reset() throws Exception - { - if (Files.exists(path)) - { - Files - .walk(path) - .forEach(file -> - { - try - { - if (!file.equals(path)) - { - log.debug("Deleting file {}", file); - Files.delete(file); - } - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }); - log.debug("Deleting data-directory {}", path); - Files.delete(path); - } - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java new file mode 100644 index 00000000..e10aae03 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java @@ -0,0 +1,84 @@ +package de.juplo.kafka.chat.backend.persistence; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Clock; +import java.util.function.Supplier; + + +@Slf4j +public class InMemoryWithFileStorageStrategyIT extends AbstractStorageStrategyIT +{ + final static Path path = Paths.get("target","local-json-files"); + + final Clock clock; + final ObjectMapper mapper; + final FileStorageStrategy storageStrategy; + + + public InMemoryWithFileStorageStrategyIT() + { + clock = Clock.systemDefaultZone(); + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + storageStrategy = new FileStorageStrategy( + path, + clock, + 8, + messageFlux -> new InMemoryChatRoomService(messageFlux), + mapper); + } + + + @Override + protected StorageStrategy getStorageStrategy() + { + return storageStrategy; + } + + @Override + protected Supplier chatHomeServiceSupplier() + { + return () -> new InMemoryChatHomeService(getStorageStrategy().readChatrooms(), clock, 8); + } + + @BeforeEach + void reset() throws Exception + { + if (Files.exists(path)) + { + Files + .walk(path) + .forEach(file -> + { + try + { + if (!file.equals(path)) + { + log.debug("Deleting file {}", file); + Files.delete(file); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + log.debug("Deleting data-directory {}", path); + Files.delete(path); + } + } +}