+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.files;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
-import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.nio.file.Paths;
-
-
-@ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "storage-strategy",
- havingValue = "files")
-@Configuration
-@EnableAutoConfiguration(
- exclude = {
- MongoRepositoriesAutoConfiguration.class,
- MongoAutoConfiguration.class })
-public class FilesStorageConfiguration
-{
- @Bean
- public StorageStrategy storageStrategy(
- ChatBackendProperties properties,
- ShardingStrategy shardingStrategy,
- ObjectMapper mapper)
- {
- return new FilesStorageStrategy(
- Paths.get(properties.getInmemory().getStorageDirectory()),
- shardingStrategy,
- mapper);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.files;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.UUID;
-
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class FilesStorageStrategy implements StorageStrategy
-{
- public static final String CHATROOMS_FILENAME = "chatrooms.json";
-
-
- private final Path storagePath;
- private final ShardingStrategy shardingStrategy;
- private final ObjectMapper mapper;
-
-
- @Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
- {
- Path path = chatroomsPath();
- log.info("Writing chatrooms to {}", path);
- try
- {
- Files.createDirectories(storagePath);
-
- JsonGenerator generator =
- mapper
- .getFactory()
- .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
- chatRoomInfoFlux
- .log()
- .doFirst(() ->
- {
- try
- {
- generator.useDefaultPrettyPrinter();
- generator.writeStartArray();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .doOnTerminate(() ->
- {
- try
- {
- generator.writeEndArray();
- generator.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .subscribe(chatRoomInfo ->
- {
- try
- {
- ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
- generator.writeObject(chatRoomInfoTo);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Flux<ChatRoomInfo> readChatRoomInfo()
- {
- JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
- return Flux
- .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
- .log()
- .map(chatRoomInfoTo ->
- {
- UUID chatRoomId = chatRoomInfoTo.getId();
- int shard = shardingStrategy.selectShard(chatRoomId);
-
- log.info(
- "{} - old shard: {}, new shard: {}",
- chatRoomId,
- chatRoomInfoTo.getShard(),
- shard);
-
- return new ChatRoomInfo(
- chatRoomId,
- chatRoomInfoTo.getName(),
- shard);
- });
- }
-
- @Override
- public void writeChatRoomData(
- UUID chatRoomId,
- Flux<Message> messageFlux)
- {
- Path path = chatroomPath(chatRoomId);
- log.info("Writing messages for {} to {}", chatRoomId, path);
- try
- {
- Files.createDirectories(storagePath);
-
- JsonGenerator generator =
- mapper
- .getFactory()
- .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
- messageFlux
- .log()
- .doFirst(() ->
- {
- try
- {
- generator.useDefaultPrettyPrinter();
- generator.writeStartArray();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .doOnTerminate(() ->
- {
- try
- {
- generator.writeEndArray();
- generator.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .subscribe(message ->
- {
- try
- {
- MessageTo messageTo = MessageTo.from(message);
- generator.writeObject(messageTo);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Flux<Message> readChatRoomData(UUID chatRoomId)
- {
- JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
- return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
- .log()
- .map(MessageTo::toMessage);
- }
-
- Path chatroomsPath()
- {
- return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
- }
-
- Path chatroomPath(UUID id)
- {
- return storagePath.resolve(Path.of(id.toString() + ".json"));
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.files;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.Consumer;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class JsonFilePublisher<T> implements Publisher<T>
-{
- private final Path path;
- private final ObjectMapper mapper;
- private final JavaType type;
-
-
- @Override
- public void subscribe(Subscriber<? super T> subscriber)
- {
- log.info("Reading chatrooms from {}", path);
-
- try
- {
- JsonParser parser =
- mapper.getFactory().createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- {
- throw new IllegalStateException("Expected content to be an array");
- }
-
- subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
- }
- catch (NoSuchFileException e)
- {
- log.info("{} does not exist - starting with empty ChatHome", path);
- subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
- }
- catch (IOException | IllegalStateException e)
- {
- subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
- }
- }
-
- @RequiredArgsConstructor
- private class JsonFileSubscription implements Subscription
- {
- private final Subscriber<? super T> subscriber;
- private final JsonParser parser;
-
- @Override
- public void request(long requested)
- {
- try
- {
- while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
- {
- subscriber.onNext(mapper.readValue(parser, type));
- requested--;
- }
-
- if (requested > 0)
- subscriber.onComplete();
- }
- catch (IOException e)
- {
- subscriber.onError(e);
- }
- }
-
- @Override
- public void cancel() {}
- }
-
- private class ReplaySubscription implements Subscription
- {
- private final Subscriber<? super T> subscriber;
- private final Iterator<Consumer<Subscriber<? super T>>> iterator;
-
- ReplaySubscription(
- Subscriber<? super T> subscriber,
- Iterable<Consumer<Subscriber<? super T>>> actions)
- {
- this.subscriber = subscriber;
- this.iterator = actions.iterator();
- }
-
- @Override
- public void request(long requested)
- {
- while (requested > 0 && iterator.hasNext())
- {
- iterator.next().accept(subscriber);
- requested--;
- }
-
- if (requested > 0)
- subscriber.onComplete();
- }
-
- @Override
- public void cancel() {}
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-
-public interface ChatRoomRepository extends MongoRepository<ChatRoomTo, String>
-{
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.*;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-
-@AllArgsConstructor
-@NoArgsConstructor
-@Getter(AccessLevel.PACKAGE)
-@Setter(AccessLevel.PACKAGE)
-@EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "shard", "name" })
-@Document
-public class ChatRoomTo
-{
- @Id
- private String id;
- private Integer shard;
- private String name;
-
- public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
- {
- return new ChatRoomTo(
- chatRoomInfo.getId().toString(),
- chatRoomInfo.getShard(),
- chatRoomInfo.getName());
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.List;
-
-
-public interface MessageRepository extends MongoRepository<MessageTo, String>
-{
- List<MessageTo> findByChatRoomIdOrderBySerialAsc(String chatRoomId);
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.*;
-import org.springframework.data.mongodb.core.index.Indexed;
-import org.springframework.data.mongodb.core.mapping.Document;
-import org.springframework.data.mongodb.core.mapping.Field;
-
-import java.time.LocalDateTime;
-import java.util.UUID;
-
-
-@AllArgsConstructor
-@NoArgsConstructor
-@Getter(AccessLevel.PACKAGE)
-@Setter(AccessLevel.PACKAGE)
-@EqualsAndHashCode(of = { "chatRoomId", "user", "id" })
-@ToString(of = { "chatRoomId", "user", "id" })
-@Document
-class MessageTo
-{
- @Indexed
- private String chatRoomId;
- @Indexed
- private String user;
- @Field("id")
- @Indexed
- private Long id;
- @Indexed
- private Long serial;
- private String time;
- private String text;
-
- Message toMessage()
- {
- return new Message(
- Message.MessageKey.of(user, id),
- serial,
- LocalDateTime.parse(time),
- text);
- }
-
- static MessageTo from(UUID chatRoomId, Message message)
- {
- return
- new MessageTo(
- chatRoomId.toString(),
- message.getUsername(),
- message.getId(),
- message.getSerialNumber(),
- message.getTimestamp().toString(),
- message.getMessageText());
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-
-@ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "storage-strategy",
- havingValue = "mongodb")
-@Configuration
-public class MongoDbStorageConfiguration
-{
- @Bean
- public StorageStrategy storageStrategy(
- ChatRoomRepository chatRoomRepository,
- MessageRepository messageRepository,
- ShardingStrategy shardingStrategy)
- {
- return new MongoDbStorageStrategy(
- chatRoomRepository,
- messageRepository,
- shardingStrategy);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class MongoDbStorageStrategy implements StorageStrategy
-{
- private final ChatRoomRepository chatRoomRepository;
- private final MessageRepository messageRepository;
- private final ShardingStrategy shardingStrategy;
-
-
- @Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
- {
- chatRoomInfoFlux
- .map(ChatRoomTo::from)
- .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
- }
-
- @Override
- public Flux<ChatRoomInfo> readChatRoomInfo()
- {
- return Flux
- .fromIterable(chatRoomRepository.findAll())
- .map(chatRoomTo ->
- {
- UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
- int shard = shardingStrategy.selectShard(chatRoomId);
-
- log.info(
- "{} - old shard: {}, new shard: {}",
- chatRoomId,
- chatRoomTo.getShard(),
- shard);
-
- return new ChatRoomInfo(
- chatRoomId,
- chatRoomTo.getName(),
- shard);
- });
- }
-
- @Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
- {
- messageFlux
- .map(message -> MessageTo.from(chatRoomId, message))
- .subscribe(messageTo -> messageRepository.save(messageTo));
- }
-
- @Override
- public Flux<Message> readChatRoomData(UUID chatRoomId)
- {
- return Flux
- .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
- .map(messageTo -> messageTo.toMessage());
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
-import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import reactor.core.publisher.Flux;
-
-import java.util.UUID;
-
-
-@ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "storage-strategy",
- havingValue = "none",
- matchIfMissing = true)
-@Configuration
-@EnableAutoConfiguration(
- exclude = {
- MongoRepositoriesAutoConfiguration.class,
- MongoAutoConfiguration.class })
-public class NoStorageStorageConfiguration
-{
- @Bean
- public StorageStrategy storageStrategy()
- {
- return new StorageStrategy()
- {
- @Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
-
- @Override
- public Flux<ChatRoomInfo> readChatRoomInfo()
- {
- return Flux.empty();
- }
-
- @Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
-
- @Override
- public Flux<Message> readChatRoomData(UUID chatRoomId)
- {
- return Flux.empty();
- }
- };
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.files;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
+import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.nio.file.Paths;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "storage-strategy",
+ havingValue = "files")
+@Configuration
+@EnableAutoConfiguration(
+ exclude = {
+ MongoRepositoriesAutoConfiguration.class,
+ MongoAutoConfiguration.class })
+public class FilesStorageConfiguration
+{
+ @Bean
+ public StorageStrategy storageStrategy(
+ ChatBackendProperties properties,
+ ShardingStrategy shardingStrategy,
+ ObjectMapper mapper)
+ {
+ return new FilesStorageStrategy(
+ Paths.get(properties.getInmemory().getStorageDirectory()),
+ shardingStrategy,
+ mapper);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.files;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.UUID;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class FilesStorageStrategy implements StorageStrategy
+{
+ public static final String CHATROOMS_FILENAME = "chatrooms.json";
+
+
+ private final Path storagePath;
+ private final ShardingStrategy shardingStrategy;
+ private final ObjectMapper mapper;
+
+
+ @Override
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ {
+ Path path = chatroomsPath();
+ log.info("Writing chatrooms to {}", path);
+ try
+ {
+ Files.createDirectories(storagePath);
+
+ JsonGenerator generator =
+ mapper
+ .getFactory()
+ .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+ chatRoomInfoFlux
+ .log()
+ .doFirst(() ->
+ {
+ try
+ {
+ generator.useDefaultPrettyPrinter();
+ generator.writeStartArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .doOnTerminate(() ->
+ {
+ try
+ {
+ generator.writeEndArray();
+ generator.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe(chatRoomInfo ->
+ {
+ try
+ {
+ ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+ generator.writeObject(chatRoomInfoTo);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> readChatRoomInfo()
+ {
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(chatRoomInfoTo ->
+ {
+ UUID chatRoomId = chatRoomInfoTo.getId();
+ int shard = shardingStrategy.selectShard(chatRoomId);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomInfoTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomInfoTo.getName(),
+ shard);
+ });
+ }
+
+ @Override
+ public void writeChatRoomData(
+ UUID chatRoomId,
+ Flux<Message> messageFlux)
+ {
+ Path path = chatroomPath(chatRoomId);
+ log.info("Writing messages for {} to {}", chatRoomId, path);
+ try
+ {
+ Files.createDirectories(storagePath);
+
+ JsonGenerator generator =
+ mapper
+ .getFactory()
+ .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+ messageFlux
+ .log()
+ .doFirst(() ->
+ {
+ try
+ {
+ generator.useDefaultPrettyPrinter();
+ generator.writeStartArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .doOnTerminate(() ->
+ {
+ try
+ {
+ generator.writeEndArray();
+ generator.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe(message ->
+ {
+ try
+ {
+ MessageTo messageTo = MessageTo.from(message);
+ generator.writeObject(messageTo);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
+ {
+ JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+ return Flux
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
+ .log()
+ .map(MessageTo::toMessage);
+ }
+
+ Path chatroomsPath()
+ {
+ return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
+ }
+
+ Path chatroomPath(UUID id)
+ {
+ return storagePath.resolve(Path.of(id.toString() + ".json"));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.files;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class JsonFilePublisher<T> implements Publisher<T>
+{
+ private final Path path;
+ private final ObjectMapper mapper;
+ private final JavaType type;
+
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber)
+ {
+ log.info("Reading chatrooms from {}", path);
+
+ try
+ {
+ JsonParser parser =
+ mapper.getFactory().createParser(Files.newBufferedReader(path));
+
+ if (parser.nextToken() != JsonToken.START_ARRAY)
+ {
+ throw new IllegalStateException("Expected content to be an array");
+ }
+
+ subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
+ }
+ catch (NoSuchFileException e)
+ {
+ log.info("{} does not exist - starting with empty ChatHome", path);
+ subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
+ }
+ catch (IOException | IllegalStateException e)
+ {
+ subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
+ }
+ }
+
+ @RequiredArgsConstructor
+ private class JsonFileSubscription implements Subscription
+ {
+ private final Subscriber<? super T> subscriber;
+ private final JsonParser parser;
+
+ @Override
+ public void request(long requested)
+ {
+ try
+ {
+ while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
+ {
+ subscriber.onNext(mapper.readValue(parser, type));
+ requested--;
+ }
+
+ if (requested > 0)
+ subscriber.onComplete();
+ }
+ catch (IOException e)
+ {
+ subscriber.onError(e);
+ }
+ }
+
+ @Override
+ public void cancel() {}
+ }
+
+ private class ReplaySubscription implements Subscription
+ {
+ private final Subscriber<? super T> subscriber;
+ private final Iterator<Consumer<Subscriber<? super T>>> iterator;
+
+ ReplaySubscription(
+ Subscriber<? super T> subscriber,
+ Iterable<Consumer<Subscriber<? super T>>> actions)
+ {
+ this.subscriber = subscriber;
+ this.iterator = actions.iterator();
+ }
+
+ @Override
+ public void request(long requested)
+ {
+ while (requested > 0 && iterator.hasNext())
+ {
+ iterator.next().accept(subscriber);
+ requested--;
+ }
+
+ if (requested > 0)
+ subscriber.onComplete();
+ }
+
+ @Override
+ public void cancel() {}
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+
+public interface ChatRoomRepository extends MongoRepository<ChatRoomTo, String>
+{
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.*;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "shard", "name" })
+@Document
+public class ChatRoomTo
+{
+ @Id
+ private String id;
+ private Integer shard;
+ private String name;
+
+ public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
+ {
+ return new ChatRoomTo(
+ chatRoomInfo.getId().toString(),
+ chatRoomInfo.getShard(),
+ chatRoomInfo.getName());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.List;
+
+
+public interface MessageRepository extends MongoRepository<MessageTo, String>
+{
+ List<MessageTo> findByChatRoomIdOrderBySerialAsc(String chatRoomId);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+import org.springframework.data.mongodb.core.index.Indexed;
+import org.springframework.data.mongodb.core.mapping.Document;
+import org.springframework.data.mongodb.core.mapping.Field;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "chatRoomId", "user", "id" })
+@ToString(of = { "chatRoomId", "user", "id" })
+@Document
+class MessageTo
+{
+ @Indexed
+ private String chatRoomId;
+ @Indexed
+ private String user;
+ @Field("id")
+ @Indexed
+ private Long id;
+ @Indexed
+ private Long serial;
+ private String time;
+ private String text;
+
+ Message toMessage()
+ {
+ return new Message(
+ Message.MessageKey.of(user, id),
+ serial,
+ LocalDateTime.parse(time),
+ text);
+ }
+
+ static MessageTo from(UUID chatRoomId, Message message)
+ {
+ return
+ new MessageTo(
+ chatRoomId.toString(),
+ message.getUsername(),
+ message.getId(),
+ message.getSerialNumber(),
+ message.getTimestamp().toString(),
+ message.getMessageText());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "storage-strategy",
+ havingValue = "mongodb")
+@Configuration
+public class MongoDbStorageConfiguration
+{
+ @Bean
+ public StorageStrategy storageStrategy(
+ ChatRoomRepository chatRoomRepository,
+ MessageRepository messageRepository,
+ ShardingStrategy shardingStrategy)
+ {
+ return new MongoDbStorageStrategy(
+ chatRoomRepository,
+ messageRepository,
+ shardingStrategy);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MongoDbStorageStrategy implements StorageStrategy
+{
+ private final ChatRoomRepository chatRoomRepository;
+ private final MessageRepository messageRepository;
+ private final ShardingStrategy shardingStrategy;
+
+
+ @Override
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ {
+ chatRoomInfoFlux
+ .map(ChatRoomTo::from)
+ .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> readChatRoomInfo()
+ {
+ return Flux
+ .fromIterable(chatRoomRepository.findAll())
+ .map(chatRoomTo ->
+ {
+ UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+ int shard = shardingStrategy.selectShard(chatRoomId);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomTo.getName(),
+ shard);
+ });
+ }
+
+ @Override
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ messageFlux
+ .map(message -> MessageTo.from(chatRoomId, message))
+ .subscribe(messageTo -> messageRepository.save(messageTo));
+ }
+
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
+ {
+ return Flux
+ .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+ .map(messageTo -> messageTo.toMessage());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
+import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import reactor.core.publisher.Flux;
+
+import java.util.UUID;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "storage-strategy",
+ havingValue = "none",
+ matchIfMissing = true)
+@Configuration
+@EnableAutoConfiguration(
+ exclude = {
+ MongoRepositoriesAutoConfiguration.class,
+ MongoAutoConfiguration.class })
+public class NoStorageStorageConfiguration
+{
+ @Bean
+ public StorageStrategy storageStrategy()
+ {
+ return new StorageStrategy()
+ {
+ @Override
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+
+ @Override
+ public Flux<ChatRoomInfo> readChatRoomInfo()
+ {
+ return Flux.empty();
+ }
+
+ @Override
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
+ {
+ return Flux.empty();
+ }
+ };
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHomeService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Clock;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT
+{
+ final Clock clock;
+
+ @Override
+ protected StorageStrategyITConfig getConfig()
+ {
+ return new StorageStrategyITConfig()
+ {
+ int bufferSize = 8;
+
+ SimpleChatHomeService simpleChatHome = new SimpleChatHomeService(
+ getStorageStrategy(),
+ clock,
+ bufferSize);
+
+ @Override
+ public ChatHomeService getChatHome()
+ {
+ return simpleChatHome;
+ }
+ };
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.*;
+
+
+@Slf4j
+public abstract class AbstractStorageStrategyIT
+{
+ protected ChatHomeService chathome;
+
+
+ protected abstract StorageStrategy getStorageStrategy();
+ protected abstract StorageStrategyITConfig getConfig();
+
+ protected void start()
+ {
+ StorageStrategyITConfig config = getConfig();
+ chathome = config.getChatHome();
+ }
+
+ protected void stop()
+ {
+ getStorageStrategy().write(chathome);
+ }
+
+ @Test
+ protected void testStoreAndRecreate()
+ {
+ start();
+
+ assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
+
+ UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block();
+ log.debug("Created chat-room {}", info);
+ ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block();
+ Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
+ Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
+ Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
+ Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
+
+ assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
+ assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
+ assertThat(chathome
+ .getChatRoomData(chatRoomId)
+ .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
+
+ stop();
+ start();
+
+ assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
+ assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
+ assertThat(chathome
+ .getChatRoomData(chatRoomId)
+ .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
+ }
+
+ @Test
+ protected void testStoreAndRecreateParallelChatRooms()
+ {
+ start();
+
+ assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
+
+ UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block();
+ log.debug("Created chat-room {}", infoA);
+ ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block();
+ Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
+ Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
+ Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
+ Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
+
+ UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
+ ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block();
+ log.debug("Created chat-room {}", infoB);
+ ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block();
+ Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
+ Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
+ Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();
+ Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block();
+
+ assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
+ assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
+ assertThat(chathome
+ .getChatRoomData(chatRoomAId)
+ .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
+ assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB);
+ assertThat(chathome
+ .getChatRoomData(chatRoomBId)
+ .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
+
+ stop();
+ start();
+
+ assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
+ assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
+ assertThat(chathome
+ .getChatRoomData(chatRoomAId)
+ .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
+ assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB);
+ assertThat(chathome
+ .getChatRoomData(chatRoomBId)
+ .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
+ }
+
+
+ interface StorageStrategyITConfig
+ {
+ ChatHomeService getChatHome();
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Clock;
+
+
+@Slf4j
+public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT
+{
+ final static Path path = Paths.get("target","files");
+
+ final ObjectMapper mapper;
+ final FilesStorageStrategy storageStrategy;
+
+
+ public InMemoryWithFilesStorageIT()
+ {
+ super(Clock.systemDefaultZone());
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ storageStrategy = new FilesStorageStrategy(
+ path,
+ chatRoomId -> 0,
+ mapper);
+ }
+
+
+ @Override
+ protected StorageStrategy getStorageStrategy()
+ {
+ return storageStrategy;
+ }
+
+ @BeforeEach
+ void reset() throws Exception
+ {
+ if (Files.exists(path))
+ {
+ Files
+ .walk(path)
+ .forEach(file ->
+ {
+ try
+ {
+ if (!file.equals(path))
+ {
+ log.debug("Deleting file {}", file);
+ Files.delete(file);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ log.debug("Deleting data-directory {}", path);
+ Files.delete(path);
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.context.support.TestPropertySourceUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.time.Clock;
+
+
+@Testcontainers
+@ExtendWith({SpringExtension.class})
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
+@ContextConfiguration(initializers = DataSourceInitializer.class)
+@Slf4j
+public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT
+{
+ @Autowired
+ MongoDbStorageStrategy storageStrategy;
+ @Autowired
+ ChatRoomRepository chatRoomRepository;
+ @Autowired
+ MessageRepository messageRepository;
+
+
+ public InMemoryWithMongoDbStorageIT()
+ {
+ super(Clock.systemDefaultZone());
+ }
+
+
+ @Override
+ protected StorageStrategy getStorageStrategy()
+ {
+ return storageStrategy;
+ }
+
+ @TestConfiguration
+ static class InMemoryWithMongoDbStorageStrategyITConfig
+ {
+ @Bean
+ MongoDbStorageStrategy storageStrategy(
+ ChatRoomRepository chatRoomRepository,
+ MessageRepository messageRepository,
+ Clock clock)
+ {
+ return new MongoDbStorageStrategy(
+ chatRoomRepository,
+ messageRepository,
+ chatRoomId -> 0);
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+ }
+
+ private static final int MONGODB_PORT = 27017;
+
+ @Container
+ private static final GenericContainer CONTAINER =
+ new GenericContainer("mongo:6")
+ .withExposedPorts(MONGODB_PORT);
+
+ public static class DataSourceInitializer
+ implements ApplicationContextInitializer<ConfigurableApplicationContext>
+ {
+ @Override
+ public void initialize(ConfigurableApplicationContext applicationContext)
+ {
+ TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
+ applicationContext,
+ "spring.data.mongodb.host=localhost",
+ "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT),
+ "spring.data.mongodb.database=test");
+ }
+ }
+
+ @BeforeEach
+ void setUpLogging()
+ {
+ Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
+ CONTAINER.followOutput(logConsumer);
+ chatRoomRepository.deleteAll();
+ messageRepository.deleteAll();
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHomeService;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-import java.time.Clock;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT
-{
- final Clock clock;
-
- @Override
- protected StorageStrategyITConfig getConfig()
- {
- return new StorageStrategyITConfig()
- {
- int bufferSize = 8;
-
- SimpleChatHomeService simpleChatHome = new SimpleChatHomeService(
- getStorageStrategy(),
- clock,
- bufferSize);
-
- @Override
- public ChatHomeService getChatHome()
- {
- return simpleChatHome;
- }
- };
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.*;
-
-
-@Slf4j
-public abstract class AbstractStorageStrategyIT
-{
- protected ChatHomeService chathome;
-
-
- protected abstract StorageStrategy getStorageStrategy();
- protected abstract StorageStrategyITConfig getConfig();
-
- protected void start()
- {
- StorageStrategyITConfig config = getConfig();
- chathome = config.getChatHome();
- }
-
- protected void stop()
- {
- getStorageStrategy().write(chathome);
- }
-
- @Test
- protected void testStoreAndRecreate()
- {
- start();
-
- assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
-
- UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
- ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block();
- log.debug("Created chat-room {}", info);
- ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block();
- Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
- Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
- Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
- Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
-
- assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
- assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
- assertThat(chathome
- .getChatRoomData(chatRoomId)
- .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
-
- stop();
- start();
-
- assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
- assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
- assertThat(chathome
- .getChatRoomData(chatRoomId)
- .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
- }
-
- @Test
- protected void testStoreAndRecreateParallelChatRooms()
- {
- start();
-
- assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
-
- UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
- ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block();
- log.debug("Created chat-room {}", infoA);
- ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block();
- Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
- Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
- Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
- Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
-
- UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
- ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block();
- log.debug("Created chat-room {}", infoB);
- ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block();
- Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
- Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
- Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();
- Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block();
-
- assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
- assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
- assertThat(chathome
- .getChatRoomData(chatRoomAId)
- .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
- assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB);
- assertThat(chathome
- .getChatRoomData(chatRoomBId)
- .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
-
- stop();
- start();
-
- assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
- assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
- assertThat(chathome
- .getChatRoomData(chatRoomAId)
- .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
- assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB);
- assertThat(chathome
- .getChatRoomData(chatRoomBId)
- .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
- }
-
-
- interface StorageStrategyITConfig
- {
- ChatHomeService getChatHome();
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Clock;
-
-
-@Slf4j
-public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT
-{
- final static Path path = Paths.get("target","files");
-
- final ObjectMapper mapper;
- final FilesStorageStrategy storageStrategy;
-
-
- public InMemoryWithFilesStorageIT()
- {
- super(Clock.systemDefaultZone());
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- storageStrategy = new FilesStorageStrategy(
- path,
- chatRoomId -> 0,
- mapper);
- }
-
-
- @Override
- protected StorageStrategy getStorageStrategy()
- {
- return storageStrategy;
- }
-
- @BeforeEach
- void reset() throws Exception
- {
- if (Files.exists(path))
- {
- Files
- .walk(path)
- .forEach(file ->
- {
- try
- {
- if (!file.equals(path))
- {
- log.debug("Deleting file {}", file);
- Files.delete(file);
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- });
- log.debug("Deleting data-directory {}", path);
- Files.delete(path);
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer;
-import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
-import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository;
-import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.ApplicationContextInitializer;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.test.context.support.TestPropertySourceUtils;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-
-import java.time.Clock;
-
-
-@Testcontainers
-@ExtendWith({SpringExtension.class})
-@EnableAutoConfiguration
-@AutoConfigureDataMongo
-@ContextConfiguration(initializers = DataSourceInitializer.class)
-@Slf4j
-public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT
-{
- @Autowired
- MongoDbStorageStrategy storageStrategy;
- @Autowired
- ChatRoomRepository chatRoomRepository;
- @Autowired
- MessageRepository messageRepository;
-
-
- public InMemoryWithMongoDbStorageIT()
- {
- super(Clock.systemDefaultZone());
- }
-
-
- @Override
- protected StorageStrategy getStorageStrategy()
- {
- return storageStrategy;
- }
-
- @TestConfiguration
- static class InMemoryWithMongoDbStorageStrategyITConfig
- {
- @Bean
- MongoDbStorageStrategy storageStrategy(
- ChatRoomRepository chatRoomRepository,
- MessageRepository messageRepository,
- Clock clock)
- {
- return new MongoDbStorageStrategy(
- chatRoomRepository,
- messageRepository,
- chatRoomId -> 0);
- }
-
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
- }
-
- private static final int MONGODB_PORT = 27017;
-
- @Container
- private static final GenericContainer CONTAINER =
- new GenericContainer("mongo:6")
- .withExposedPorts(MONGODB_PORT);
-
- public static class DataSourceInitializer
- implements ApplicationContextInitializer<ConfigurableApplicationContext>
- {
- @Override
- public void initialize(ConfigurableApplicationContext applicationContext)
- {
- TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
- applicationContext,
- "spring.data.mongodb.host=localhost",
- "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT),
- "spring.data.mongodb.database=test");
- }
- }
-
- @BeforeEach
- void setUpLogging()
- {
- Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
- CONTAINER.followOutput(logConsumer);
- chatRoomRepository.deleteAll();
- messageRepository.deleteAll();
- }
-}