+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
-
-@Slf4j
-public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
-{
- private final Map<UUID, ChatRoom> chatrooms;
- private final Clock clock;
- private final int bufferSize;
-
-
- public InMemoryChatHomeService(
- Flux<ChatRoom> chatroomFlux,
- Clock clock,
- int bufferSize)
- {
- log.debug("Creating ChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
- this.chatrooms = new HashMap<>();
- chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
- this.clock = clock;
- this.bufferSize = bufferSize;
- }
-
- @Override
- public Mono<ChatRoom> createChatRoom(UUID id, String name)
- {
- InMemoryChatRoomService service =
- new InMemoryChatRoomService(new LinkedHashMap<>());
- ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize);
- chatrooms.put(chatRoom.getId(), chatRoom);
- return Mono.just(chatRoom);
- }
-
- @Override
- public Mono<ChatRoom> getChatRoom(UUID id)
- {
- return Mono.justOrEmpty(chatrooms.get(id));
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms()
- {
- return Flux.fromStream(chatrooms.values().stream());
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.util.LinkedHashMap;
-
-
-@Slf4j
-public class InMemoryChatRoomService implements ChatRoomService
-{
- private final LinkedHashMap<Message.MessageKey, Message> messages;
-
-
- public InMemoryChatRoomService(LinkedHashMap<Message.MessageKey, Message> messages)
- {
- this.messages = messages;
- }
-
- public InMemoryChatRoomService(Flux<Message> messageFlux)
- {
- log.debug("Creating InMemoryChatroomService");
- messages = new LinkedHashMap<>();
- messageFlux.subscribe(message -> messages.put(message.getKey(), message));
- }
-
- @Override
- public Message persistMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- Message message = new Message(key, (long)messages.size(), timestamp, text);
- messages.put(message.getKey(), message);
- return message;
- }
-
- @Override
- public Mono<Message> getMessage(Message.MessageKey key)
- {
- return Mono.fromSupplier(() -> messages.get(key));
- }
-
- @Override
- public Flux<Message> getMessages(long first, long last)
- {
- return Flux.fromStream(messages
- .values()
- .stream()
- .filter(message ->
- {
- long serial = message.getSerialNumber();
- return serial >= first && serial <= last;
- }));
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.Consumer;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class JsonFilePublisher<T> implements Publisher<T>
-{
- private final Path path;
- private final ObjectMapper mapper;
- private final JavaType type;
-
-
- @Override
- public void subscribe(Subscriber<? super T> subscriber)
- {
- log.info("Reading chatrooms from {}", path);
-
- try
- {
- JsonParser parser =
- mapper.getFactory().createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- {
- throw new IllegalStateException("Expected content to be an array");
- }
-
- subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
- }
- catch (NoSuchFileException e)
- {
- log.info("{} does not exist - starting with empty ChatHome", path);
- subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
- }
- catch (IOException | IllegalStateException e)
- {
- subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
- }
- }
-
- @RequiredArgsConstructor
- private class JsonFileSubscription implements Subscription
- {
- private final Subscriber<? super T> subscriber;
- private final JsonParser parser;
-
- @Override
- public void request(long requested)
- {
- try
- {
- while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
- {
- subscriber.onNext(mapper.readValue(parser, type));
- requested--;
- }
-
- if (requested > 0)
- subscriber.onComplete();
- }
- catch (IOException e)
- {
- subscriber.onError(e);
- }
- }
-
- @Override
- public void cancel() {}
- }
-
- private class ReplaySubscription implements Subscription
- {
- private final Subscriber<? super T> subscriber;
- private final Iterator<Consumer<Subscriber<? super T>>> iterator;
-
- ReplaySubscription(
- Subscriber<? super T> subscriber,
- Iterable<Consumer<Subscriber<? super T>>> actions)
- {
- this.subscriber = subscriber;
- this.iterator = actions.iterator();
- }
-
- @Override
- public void request(long requested)
- {
- while (requested > 0 && iterator.hasNext())
- {
- iterator.next().accept(subscriber);
- requested--;
- }
-
- if (requested > 0)
- subscriber.onComplete();
- }
-
- @Override
- public void cancel() {}
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Clock;
-
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class LocalJsonFilesStorageStrategy implements StorageStrategy
-{
- public static final String CHATROOMS_FILENAME = "chatrooms.json";
-
-
- private final Path storagePath;
- private final Clock clock;
- private final int bufferSize;
- private final ObjectMapper mapper;
-
-
- @Override
- public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
- {
- Path path = chatroomsPath();
- log.info("Writing chatrooms to {}", path);
- try
- {
- Files.createDirectories(storagePath);
-
- JsonGenerator generator =
- mapper
- .getFactory()
- .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
- chatroomFlux
- .log()
- .doFirst(() ->
- {
- try
- {
- generator.useDefaultPrettyPrinter();
- generator.writeStartArray();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .doOnTerminate(() ->
- {
- try
- {
- generator.writeEndArray();
- generator.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .subscribe(chatroom ->
- {
- try
- {
- ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
- generator.writeObject(chatroomTo);
- writeMessages(chatroomTo, chatroom.getMessages());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Flux<ChatRoom> readChatrooms()
- {
- JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
- return Flux
- .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
- .log()
- .map(chatRoomTo ->
- {
- InMemoryChatRoomService chatroomService =
- new InMemoryChatRoomService(readMessages(chatRoomTo));
- return new ChatRoom(
- chatRoomTo.getId(),
- chatRoomTo.getName(),
- clock,
- chatroomService,
- bufferSize);
- });
- }
-
- @Override
- public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
- {
- Path path = chatroomPath(chatroomTo);
- log.info("Writing messages for {} to {}", chatroomTo, path);
- try
- {
- Files.createDirectories(storagePath);
-
- JsonGenerator generator =
- mapper
- .getFactory()
- .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
- messageFlux
- .log()
- .doFirst(() ->
- {
- try
- {
- generator.useDefaultPrettyPrinter();
- generator.writeStartArray();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .doOnTerminate(() ->
- {
- try
- {
- generator.writeEndArray();
- generator.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .subscribe(message ->
- {
- try
- {
- MessageTo messageTo = MessageTo.from(message);
- generator.writeObject(messageTo);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Flux<Message> readMessages(ChatRoomTo chatroomTo)
- {
- JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
- return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
- .log()
- .map(MessageTo::toMessage);
- }
-
- Path chatroomsPath()
- {
- return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
- }
-
- Path chatroomPath(ChatRoomTo chatroomTo)
- {
- return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatRoomTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Clock;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class LocalJsonFilesStorageStrategy implements StorageStrategy
+{
+ public static final String CHATROOMS_FILENAME = "chatrooms.json";
+
+
+ private final Path storagePath;
+ private final Clock clock;
+ private final int bufferSize;
+ private final ObjectMapper mapper;
+
+
+ @Override
+ public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
+ {
+ Path path = chatroomsPath();
+ log.info("Writing chatrooms to {}", path);
+ try
+ {
+ Files.createDirectories(storagePath);
+
+ JsonGenerator generator =
+ mapper
+ .getFactory()
+ .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+ chatroomFlux
+ .log()
+ .doFirst(() ->
+ {
+ try
+ {
+ generator.useDefaultPrettyPrinter();
+ generator.writeStartArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .doOnTerminate(() ->
+ {
+ try
+ {
+ generator.writeEndArray();
+ generator.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe(chatroom ->
+ {
+ try
+ {
+ ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
+ generator.writeObject(chatroomTo);
+ writeMessages(chatroomTo, chatroom.getMessages());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Flux<ChatRoom> readChatrooms()
+ {
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(chatRoomTo ->
+ {
+ InMemoryChatRoomService chatroomService =
+ new InMemoryChatRoomService(readMessages(chatRoomTo));
+ return new ChatRoom(
+ chatRoomTo.getId(),
+ chatRoomTo.getName(),
+ clock,
+ chatroomService,
+ bufferSize);
+ });
+ }
+
+ @Override
+ public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
+ {
+ Path path = chatroomPath(chatroomTo);
+ log.info("Writing messages for {} to {}", chatroomTo, path);
+ try
+ {
+ Files.createDirectories(storagePath);
+
+ JsonGenerator generator =
+ mapper
+ .getFactory()
+ .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+ messageFlux
+ .log()
+ .doFirst(() ->
+ {
+ try
+ {
+ generator.useDefaultPrettyPrinter();
+ generator.writeStartArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .doOnTerminate(() ->
+ {
+ try
+ {
+ generator.writeEndArray();
+ generator.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe(message ->
+ {
+ try
+ {
+ MessageTo messageTo = MessageTo.from(message);
+ generator.writeObject(messageTo);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Flux<Message> readMessages(ChatRoomTo chatroomTo)
+ {
+ JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+ return Flux
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .log()
+ .map(MessageTo::toMessage);
+ }
+
+ Path chatroomsPath()
+ {
+ return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
+ }
+
+ Path chatroomPath(ChatRoomTo chatroomTo)
+ {
+ return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class JsonFilePublisher<T> implements Publisher<T>
+{
+ private final Path path;
+ private final ObjectMapper mapper;
+ private final JavaType type;
+
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber)
+ {
+ log.info("Reading chatrooms from {}", path);
+
+ try
+ {
+ JsonParser parser =
+ mapper.getFactory().createParser(Files.newBufferedReader(path));
+
+ if (parser.nextToken() != JsonToken.START_ARRAY)
+ {
+ throw new IllegalStateException("Expected content to be an array");
+ }
+
+ subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
+ }
+ catch (NoSuchFileException e)
+ {
+ log.info("{} does not exist - starting with empty ChatHome", path);
+ subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
+ }
+ catch (IOException | IllegalStateException e)
+ {
+ subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
+ }
+ }
+
+ @RequiredArgsConstructor
+ private class JsonFileSubscription implements Subscription
+ {
+ private final Subscriber<? super T> subscriber;
+ private final JsonParser parser;
+
+ @Override
+ public void request(long requested)
+ {
+ try
+ {
+ while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
+ {
+ subscriber.onNext(mapper.readValue(parser, type));
+ requested--;
+ }
+
+ if (requested > 0)
+ subscriber.onComplete();
+ }
+ catch (IOException e)
+ {
+ subscriber.onError(e);
+ }
+ }
+
+ @Override
+ public void cancel() {}
+ }
+
+ private class ReplaySubscription implements Subscription
+ {
+ private final Subscriber<? super T> subscriber;
+ private final Iterator<Consumer<Subscriber<? super T>>> iterator;
+
+ ReplaySubscription(
+ Subscriber<? super T> subscriber,
+ Iterable<Consumer<Subscriber<? super T>>> actions)
+ {
+ this.subscriber = subscriber;
+ this.iterator = actions.iterator();
+ }
+
+ @Override
+ public void request(long requested)
+ {
+ while (requested > 0 && iterator.hasNext())
+ {
+ iterator.next().accept(subscriber);
+ requested--;
+ }
+
+ if (requested > 0)
+ subscriber.onComplete();
+ }
+
+ @Override
+ public void cancel() {}
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+@Slf4j
+public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
+{
+ private final Map<UUID, ChatRoom> chatrooms;
+ private final Clock clock;
+ private final int bufferSize;
+
+
+ public InMemoryChatHomeService(
+ Flux<ChatRoom> chatroomFlux,
+ Clock clock,
+ int bufferSize)
+ {
+ log.debug("Creating ChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
+ this.chatrooms = new HashMap<>();
+ chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+ this.clock = clock;
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public Mono<ChatRoom> createChatRoom(UUID id, String name)
+ {
+ InMemoryChatRoomService service =
+ new InMemoryChatRoomService(new LinkedHashMap<>());
+ ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize);
+ chatrooms.put(chatRoom.getId(), chatRoom);
+ return Mono.just(chatRoom);
+ }
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ return Mono.justOrEmpty(chatrooms.get(id));
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ return Flux.fromStream(chatrooms.values().stream());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+
+
+@Slf4j
+public class InMemoryChatRoomService implements ChatRoomService
+{
+ private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+ public InMemoryChatRoomService(LinkedHashMap<Message.MessageKey, Message> messages)
+ {
+ this.messages = messages;
+ }
+
+ public InMemoryChatRoomService(Flux<Message> messageFlux)
+ {
+ log.debug("Creating InMemoryChatroomService");
+ messages = new LinkedHashMap<>();
+ messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+ }
+
+ @Override
+ public Message persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ Message message = new Message(key, (long)messages.size(), timestamp, text);
+ messages.put(message.getKey(), message);
+ return message;
+ }
+
+ @Override
+ public Mono<Message> getMessage(Message.MessageKey key)
+ {
+ return Mono.fromSupplier(() -> messages.get(key));
+ }
+
+ @Override
+ public Flux<Message> getMessages(long first, long last)
+ {
+ return Flux.fromStream(messages
+ .values()
+ .stream()
+ .filter(message ->
+ {
+ long serial = message.getSerialNumber();
+ return serial >= first && serial <= last;
+ }));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.function.Supplier;
+
+
+@Slf4j
+public class LocalJsonFilesStorageStrategyIT extends AbstractStorageStrategyIT
+{
+ final static Path path = Paths.get("target","local-json-files");
+
+ final Clock clock;
+ final ObjectMapper mapper;
+ final LocalJsonFilesStorageStrategy storageStrategy;
+
+
+ public LocalJsonFilesStorageStrategyIT()
+ {
+ clock = Clock.systemDefaultZone();
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ storageStrategy =new LocalJsonFilesStorageStrategy(path, clock, 8, mapper);
+
+ }
+
+
+ @Override
+ StorageStrategy getStorageStrategy()
+ {
+ return storageStrategy;
+ }
+
+ @Override
+ Supplier<ChatHomeService> chatHomeServiceSupplier()
+ {
+ return () -> new InMemoryChatHomeService(getStorageStrategy().readChatrooms(), clock, 8);
+ }
+
+ @BeforeEach
+ void reset() throws Exception
+ {
+ if (Files.exists(path))
+ {
+ Files
+ .walk(path)
+ .forEach(file ->
+ {
+ try
+ {
+ if (!file.equals(path))
+ {
+ log.debug("Deleting file {}", file);
+ Files.delete(file);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ log.debug("Deleting data-directory {}", path);
+ Files.delete(path);
+ }
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Clock;
-import java.util.function.Supplier;
-
-
-@Slf4j
-public class LocalJsonFilesStorageStrategyIT extends AbstractStorageStrategyIT
-{
- final static Path path = Paths.get("target","local-json-files");
-
- final Clock clock;
- final ObjectMapper mapper;
- final LocalJsonFilesStorageStrategy storageStrategy;
-
-
- public LocalJsonFilesStorageStrategyIT()
- {
- clock = Clock.systemDefaultZone();
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- storageStrategy =new LocalJsonFilesStorageStrategy(path, clock, 8, mapper);
-
- }
-
-
- @Override
- StorageStrategy getStorageStrategy()
- {
- return storageStrategy;
- }
-
- @Override
- Supplier<ChatHomeService> chatHomeServiceSupplier()
- {
- return () -> new InMemoryChatHomeService(getStorageStrategy().readChatrooms(), clock, 8);
- }
-
- @BeforeEach
- void reset() throws Exception
- {
- if (Files.exists(path))
- {
- Files
- .walk(path)
- .forEach(file ->
- {
- try
- {
- if (!file.equals(path))
- {
- log.debug("Deleting file {}", file);
- Files.delete(file);
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- log.debug("Deleting data-directory {}", path);
- Files.delete(path);
- }
- }
-}