+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.filestorage;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;
-import reactor.core.publisher.Flux;
-
-
-public interface ChatRoomServiceFactory
-{
- ChatRoomService create(Flux<Message> messageFlux);
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.filestorage;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Clock;
-
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class FileStorageStrategy implements StorageStrategy
-{
- public static final String CHATROOMS_FILENAME = "chatrooms.json";
-
-
- private final Path storagePath;
- private final Clock clock;
- private final int bufferSize;
- private final ChatRoomServiceFactory factory;
- private final ObjectMapper mapper;
-
-
- @Override
- public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
- {
- Path path = chatroomsPath();
- log.info("Writing chatrooms to {}", path);
- try
- {
- Files.createDirectories(storagePath);
-
- JsonGenerator generator =
- mapper
- .getFactory()
- .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
- chatroomFlux
- .log()
- .doFirst(() ->
- {
- try
- {
- generator.useDefaultPrettyPrinter();
- generator.writeStartArray();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .doOnTerminate(() ->
- {
- try
- {
- generator.writeEndArray();
- generator.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .subscribe(chatroom ->
- {
- try
- {
- ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
- generator.writeObject(chatroomTo);
- writeMessages(chatroomTo, chatroom.getMessages());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Flux<ChatRoom> readChatrooms()
- {
- JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
- return Flux
- .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
- .log()
- .map(chatRoomTo -> new ChatRoom(
- chatRoomTo.getId(),
- chatRoomTo.getName(),
- clock,
- factory.create(readMessages(chatRoomTo)),
- bufferSize));
- }
-
- @Override
- public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
- {
- Path path = chatroomPath(chatroomTo);
- log.info("Writing messages for {} to {}", chatroomTo, path);
- try
- {
- Files.createDirectories(storagePath);
-
- JsonGenerator generator =
- mapper
- .getFactory()
- .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
- messageFlux
- .log()
- .doFirst(() ->
- {
- try
- {
- generator.useDefaultPrettyPrinter();
- generator.writeStartArray();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .doOnTerminate(() ->
- {
- try
- {
- generator.writeEndArray();
- generator.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .subscribe(message ->
- {
- try
- {
- MessageTo messageTo = MessageTo.from(message);
- generator.writeObject(messageTo);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Flux<Message> readMessages(ChatRoomTo chatroomTo)
- {
- JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
- return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
- .log()
- .map(MessageTo::toMessage);
- }
-
- Path chatroomsPath()
- {
- return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
- }
-
- Path chatroomPath(ChatRoomTo chatroomTo)
- {
- return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.filestorage;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.Consumer;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class JsonFilePublisher<T> implements Publisher<T>
-{
- private final Path path;
- private final ObjectMapper mapper;
- private final JavaType type;
-
-
- @Override
- public void subscribe(Subscriber<? super T> subscriber)
- {
- log.info("Reading chatrooms from {}", path);
-
- try
- {
- JsonParser parser =
- mapper.getFactory().createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- {
- throw new IllegalStateException("Expected content to be an array");
- }
-
- subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
- }
- catch (NoSuchFileException e)
- {
- log.info("{} does not exist - starting with empty ChatHome", path);
- subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
- }
- catch (IOException | IllegalStateException e)
- {
- subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
- }
- }
-
- @RequiredArgsConstructor
- private class JsonFileSubscription implements Subscription
- {
- private final Subscriber<? super T> subscriber;
- private final JsonParser parser;
-
- @Override
- public void request(long requested)
- {
- try
- {
- while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
- {
- subscriber.onNext(mapper.readValue(parser, type));
- requested--;
- }
-
- if (requested > 0)
- subscriber.onComplete();
- }
- catch (IOException e)
- {
- subscriber.onError(e);
- }
- }
-
- @Override
- public void cancel() {}
- }
-
- private class ReplaySubscription implements Subscription
- {
- private final Subscriber<? super T> subscriber;
- private final Iterator<Consumer<Subscriber<? super T>>> iterator;
-
- ReplaySubscription(
- Subscriber<? super T> subscriber,
- Iterable<Consumer<Subscriber<? super T>>> actions)
- {
- this.subscriber = subscriber;
- this.iterator = actions.iterator();
- }
-
- @Override
- public void request(long requested)
- {
- while (requested > 0 && iterator.hasNext())
- {
- iterator.next().accept(subscriber);
- requested--;
- }
-
- if (requested > 0)
- subscriber.onComplete();
- }
-
- @Override
- public void cancel() {}
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.filestorage;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Flux;
+
+
+public interface ChatRoomServiceFactory
+{
+ ChatRoomService create(Flux<Message> messageFlux);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.filestorage;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatRoomTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Clock;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class FileStorageStrategy implements StorageStrategy
+{
+ public static final String CHATROOMS_FILENAME = "chatrooms.json";
+
+
+ private final Path storagePath;
+ private final Clock clock;
+ private final int bufferSize;
+ private final ChatRoomServiceFactory factory;
+ private final ObjectMapper mapper;
+
+
+ @Override
+ public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
+ {
+ Path path = chatroomsPath();
+ log.info("Writing chatrooms to {}", path);
+ try
+ {
+ Files.createDirectories(storagePath);
+
+ JsonGenerator generator =
+ mapper
+ .getFactory()
+ .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+ chatroomFlux
+ .log()
+ .doFirst(() ->
+ {
+ try
+ {
+ generator.useDefaultPrettyPrinter();
+ generator.writeStartArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .doOnTerminate(() ->
+ {
+ try
+ {
+ generator.writeEndArray();
+ generator.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe(chatroom ->
+ {
+ try
+ {
+ ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
+ generator.writeObject(chatroomTo);
+ writeMessages(chatroomTo, chatroom.getMessages());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Flux<ChatRoom> readChatrooms()
+ {
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(chatRoomTo -> new ChatRoom(
+ chatRoomTo.getId(),
+ chatRoomTo.getName(),
+ clock,
+ factory.create(readMessages(chatRoomTo)),
+ bufferSize));
+ }
+
+ @Override
+ public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
+ {
+ Path path = chatroomPath(chatroomTo);
+ log.info("Writing messages for {} to {}", chatroomTo, path);
+ try
+ {
+ Files.createDirectories(storagePath);
+
+ JsonGenerator generator =
+ mapper
+ .getFactory()
+ .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+ messageFlux
+ .log()
+ .doFirst(() ->
+ {
+ try
+ {
+ generator.useDefaultPrettyPrinter();
+ generator.writeStartArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .doOnTerminate(() ->
+ {
+ try
+ {
+ generator.writeEndArray();
+ generator.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe(message ->
+ {
+ try
+ {
+ MessageTo messageTo = MessageTo.from(message);
+ generator.writeObject(messageTo);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Flux<Message> readMessages(ChatRoomTo chatroomTo)
+ {
+ JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+ return Flux
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .log()
+ .map(MessageTo::toMessage);
+ }
+
+ Path chatroomsPath()
+ {
+ return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
+ }
+
+ Path chatroomPath(ChatRoomTo chatroomTo)
+ {
+ return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.filestorage;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class JsonFilePublisher<T> implements Publisher<T>
+{
+ private final Path path;
+ private final ObjectMapper mapper;
+ private final JavaType type;
+
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber)
+ {
+ log.info("Reading chatrooms from {}", path);
+
+ try
+ {
+ JsonParser parser =
+ mapper.getFactory().createParser(Files.newBufferedReader(path));
+
+ if (parser.nextToken() != JsonToken.START_ARRAY)
+ {
+ throw new IllegalStateException("Expected content to be an array");
+ }
+
+ subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
+ }
+ catch (NoSuchFileException e)
+ {
+ log.info("{} does not exist - starting with empty ChatHome", path);
+ subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
+ }
+ catch (IOException | IllegalStateException e)
+ {
+ subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
+ }
+ }
+
+ @RequiredArgsConstructor
+ private class JsonFileSubscription implements Subscription
+ {
+ private final Subscriber<? super T> subscriber;
+ private final JsonParser parser;
+
+ @Override
+ public void request(long requested)
+ {
+ try
+ {
+ while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
+ {
+ subscriber.onNext(mapper.readValue(parser, type));
+ requested--;
+ }
+
+ if (requested > 0)
+ subscriber.onComplete();
+ }
+ catch (IOException e)
+ {
+ subscriber.onError(e);
+ }
+ }
+
+ @Override
+ public void cancel() {}
+ }
+
+ private class ReplaySubscription implements Subscription
+ {
+ private final Subscriber<? super T> subscriber;
+ private final Iterator<Consumer<Subscriber<? super T>>> iterator;
+
+ ReplaySubscription(
+ Subscriber<? super T> subscriber,
+ Iterable<Consumer<Subscriber<? super T>>> actions)
+ {
+ this.subscriber = subscriber;
+ this.iterator = actions.iterator();
+ }
+
+ @Override
+ public void request(long requested)
+ {
+ while (requested > 0 && iterator.hasNext())
+ {
+ iterator.next().accept(subscriber);
+ requested--;
+ }
+
+ if (requested > 0)
+ subscriber.onComplete();
+ }
+
+ @Override
+ public void cancel() {}
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Clock;
-import java.util.function.Supplier;
-
-
-@Slf4j
-public class InMemoryWithFileStorageStrategyIT extends AbstractStorageStrategyIT
-{
- final static Path path = Paths.get("target","local-json-files");
-
- final Clock clock;
- final ObjectMapper mapper;
- final FileStorageStrategy storageStrategy;
-
-
- public InMemoryWithFileStorageStrategyIT()
- {
- clock = Clock.systemDefaultZone();
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- storageStrategy =new FileStorageStrategy(
- path,
- clock,
- 8,
- messageFlux -> new InMemoryChatRoomService(messageFlux),
- mapper);
-
- }
-
-
- @Override
- protected StorageStrategy getStorageStrategy()
- {
- return storageStrategy;
- }
-
- @Override
- protected Supplier<ChatHomeService> chatHomeServiceSupplier()
- {
- return () -> new InMemoryChatHomeService(getStorageStrategy().readChatrooms(), clock, 8);
- }
-
- @BeforeEach
- void reset() throws Exception
- {
- if (Files.exists(path))
- {
- Files
- .walk(path)
- .forEach(file ->
- {
- try
- {
- if (!file.equals(path))
- {
- log.debug("Deleting file {}", file);
- Files.delete(file);
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- log.debug("Deleting data-directory {}", path);
- Files.delete(path);
- }
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.filestorage.FileStorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.function.Supplier;
+
+
+@Slf4j
+public class InMemoryWithFileStorageStrategyIT extends AbstractStorageStrategyIT
+{
+ final static Path path = Paths.get("target","local-json-files");
+
+ final Clock clock;
+ final ObjectMapper mapper;
+ final FileStorageStrategy storageStrategy;
+
+
+ public InMemoryWithFileStorageStrategyIT()
+ {
+ clock = Clock.systemDefaultZone();
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ storageStrategy =new FileStorageStrategy(
+ path,
+ clock,
+ 8,
+ messageFlux -> new InMemoryChatRoomService(messageFlux),
+ mapper);
+
+ }
+
+
+ @Override
+ protected StorageStrategy getStorageStrategy()
+ {
+ return storageStrategy;
+ }
+
+ @Override
+ protected Supplier<ChatHomeService> chatHomeServiceSupplier()
+ {
+ return () -> new InMemoryChatHomeService(getStorageStrategy().readChatrooms(), clock, 8);
+ }
+
+ @BeforeEach
+ void reset() throws Exception
+ {
+ if (Files.exists(path))
+ {
+ Files
+ .walk(path)
+ .forEach(file ->
+ {
+ try
+ {
+ if (!file.equals(path))
+ {
+ log.debug("Deleting file {}", file);
+ Files.delete(file);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ log.debug("Deleting data-directory {}", path);
+ Files.delete(path);
+ }
+ }
+}