From: Kai Moritz Date: Wed, 6 Sep 2023 21:46:29 +0000 (+0200) Subject: refactor: `storage` is not a sub-package of `persistence` - Moved classes X-Git-Tag: rebase--2024-02-03--15-10~31 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1bfa69e21b684b079d67cdc9159b0641ae2a7480;p=demos%2Fkafka%2Fchat refactor: `storage` is not a sub-package of `persistence` - Moved classes --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java deleted file mode 100644 index 3a59acb6..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java +++ /dev/null @@ -1,39 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.files; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; -import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.nio.file.Paths; - - -@ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "storage-strategy", - havingValue = "files") -@Configuration -@EnableAutoConfiguration( - exclude = { - MongoRepositoriesAutoConfiguration.class, - MongoAutoConfiguration.class }) -public class FilesStorageConfiguration -{ - @Bean - public StorageStrategy storageStrategy( - ChatBackendProperties properties, - ShardingStrategy shardingStrategy, - ObjectMapper mapper) - { - return new FilesStorageStrategy( - Paths.get(properties.getInmemory().getStorageDirectory()), - shardingStrategy, - mapper); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java deleted file mode 100644 index 9c791977..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ /dev/null @@ -1,201 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.files; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; -import de.juplo.kafka.chat.backend.api.MessageTo; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.UUID; - -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; - - -@RequiredArgsConstructor -@Slf4j -public class FilesStorageStrategy implements StorageStrategy -{ - public static final String CHATROOMS_FILENAME = "chatrooms.json"; - - - private final Path storagePath; - private final ShardingStrategy shardingStrategy; - private final ObjectMapper mapper; - - - @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) - { - Path path = chatroomsPath(); - log.info("Writing chatrooms to {}", path); - try - { - Files.createDirectories(storagePath); - - JsonGenerator generator = - mapper - .getFactory() - .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - - chatRoomInfoFlux - .log() - .doFirst(() -> - { - try - { - generator.useDefaultPrettyPrinter(); - generator.writeStartArray(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .doOnTerminate(() -> - { - try - { - generator.writeEndArray(); - generator.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .subscribe(chatRoomInfo -> - { - try - { - ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); - generator.writeObject(chatRoomInfoTo); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - @Override - public Flux readChatRoomInfo() - { - JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); - return Flux - .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) - .log() - .map(chatRoomInfoTo -> - { - UUID chatRoomId = chatRoomInfoTo.getId(); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomInfoTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomInfoTo.getName(), - shard); - }); - } - - @Override - public void writeChatRoomData( - UUID chatRoomId, - Flux messageFlux) - { - Path path = chatroomPath(chatRoomId); - log.info("Writing messages for {} to {}", chatRoomId, path); - try - { - Files.createDirectories(storagePath); - - JsonGenerator generator = - mapper - .getFactory() - .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - - messageFlux - .log() - .doFirst(() -> - { - try - { - generator.useDefaultPrettyPrinter(); - generator.writeStartArray(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .doOnTerminate(() -> - { - try - { - generator.writeEndArray(); - generator.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .subscribe(message -> - { - try - { - MessageTo messageTo = MessageTo.from(message); - generator.writeObject(messageTo); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - @Override - public Flux readChatRoomData(UUID chatRoomId) - { - JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); - return Flux - .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) - .log() - .map(MessageTo::toMessage); - } - - Path chatroomsPath() - { - return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); - } - - Path chatroomPath(UUID id) - { - return storagePath.resolve(Path.of(id.toString() + ".json")); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java deleted file mode 100644 index aec8b367..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java +++ /dev/null @@ -1,118 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.files; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Path; -import java.util.Iterator; -import java.util.List; -import java.util.function.Consumer; - - -@RequiredArgsConstructor -@Slf4j -public class JsonFilePublisher implements Publisher -{ - private final Path path; - private final ObjectMapper mapper; - private final JavaType type; - - - @Override - public void subscribe(Subscriber subscriber) - { - log.info("Reading chatrooms from {}", path); - - try - { - JsonParser parser = - mapper.getFactory().createParser(Files.newBufferedReader(path)); - - if (parser.nextToken() != JsonToken.START_ARRAY) - { - throw new IllegalStateException("Expected content to be an array"); - } - - subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser)); - } - catch (NoSuchFileException e) - { - log.info("{} does not exist - starting with empty ChatHome", path); - subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of())); - } - catch (IOException | IllegalStateException e) - { - subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e))))); - } - } - - @RequiredArgsConstructor - private class JsonFileSubscription implements Subscription - { - private final Subscriber subscriber; - private final JsonParser parser; - - @Override - public void request(long requested) - { - try - { - while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY) - { - subscriber.onNext(mapper.readValue(parser, type)); - requested--; - } - - if (requested > 0) - subscriber.onComplete(); - } - catch (IOException e) - { - subscriber.onError(e); - } - } - - @Override - public void cancel() {} - } - - private class ReplaySubscription implements Subscription - { - private final Subscriber subscriber; - private final Iterator>> iterator; - - ReplaySubscription( - Subscriber subscriber, - Iterable>> actions) - { - this.subscriber = subscriber; - this.iterator = actions.iterator(); - } - - @Override - public void request(long requested) - { - while (requested > 0 && iterator.hasNext()) - { - iterator.next().accept(subscriber); - requested--; - } - - if (requested > 0) - subscriber.onComplete(); - } - - @Override - public void cancel() {} - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java deleted file mode 100644 index 12e5b968..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java +++ /dev/null @@ -1,8 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.mongodb; - -import org.springframework.data.mongodb.repository.MongoRepository; - - -public interface ChatRoomRepository extends MongoRepository -{ -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java deleted file mode 100644 index 0086053e..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java +++ /dev/null @@ -1,30 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.mongodb; - -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import lombok.*; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; - - -@AllArgsConstructor -@NoArgsConstructor -@Getter(AccessLevel.PACKAGE) -@Setter(AccessLevel.PACKAGE) -@EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "shard", "name" }) -@Document -public class ChatRoomTo -{ - @Id - private String id; - private Integer shard; - private String name; - - public static ChatRoomTo from(ChatRoomInfo chatRoomInfo) - { - return new ChatRoomTo( - chatRoomInfo.getId().toString(), - chatRoomInfo.getShard(), - chatRoomInfo.getName()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java deleted file mode 100644 index a429f96e..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.mongodb; - -import org.springframework.data.mongodb.repository.MongoRepository; - -import java.util.List; - - -public interface MessageRepository extends MongoRepository -{ - List findByChatRoomIdOrderBySerialAsc(String chatRoomId); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java deleted file mode 100644 index f6c6b85d..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java +++ /dev/null @@ -1,54 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.mongodb; - -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.*; -import org.springframework.data.mongodb.core.index.Indexed; -import org.springframework.data.mongodb.core.mapping.Document; -import org.springframework.data.mongodb.core.mapping.Field; - -import java.time.LocalDateTime; -import java.util.UUID; - - -@AllArgsConstructor -@NoArgsConstructor -@Getter(AccessLevel.PACKAGE) -@Setter(AccessLevel.PACKAGE) -@EqualsAndHashCode(of = { "chatRoomId", "user", "id" }) -@ToString(of = { "chatRoomId", "user", "id" }) -@Document -class MessageTo -{ - @Indexed - private String chatRoomId; - @Indexed - private String user; - @Field("id") - @Indexed - private Long id; - @Indexed - private Long serial; - private String time; - private String text; - - Message toMessage() - { - return new Message( - Message.MessageKey.of(user, id), - serial, - LocalDateTime.parse(time), - text); - } - - static MessageTo from(UUID chatRoomId, Message message) - { - return - new MessageTo( - chatRoomId.toString(), - message.getUsername(), - message.getId(), - message.getSerialNumber(), - message.getTimestamp().toString(), - message.getMessageText()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java deleted file mode 100644 index e6f71490..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java +++ /dev/null @@ -1,28 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.mongodb; - -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - -@ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "storage-strategy", - havingValue = "mongodb") -@Configuration -public class MongoDbStorageConfiguration -{ - @Bean - public StorageStrategy storageStrategy( - ChatRoomRepository chatRoomRepository, - MessageRepository messageRepository, - ShardingStrategy shardingStrategy) - { - return new MongoDbStorageStrategy( - chatRoomRepository, - messageRepository, - shardingStrategy); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java deleted file mode 100644 index 644ab887..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ /dev/null @@ -1,69 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.mongodb; - -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; - -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -public class MongoDbStorageStrategy implements StorageStrategy -{ - private final ChatRoomRepository chatRoomRepository; - private final MessageRepository messageRepository; - private final ShardingStrategy shardingStrategy; - - - @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) - { - chatRoomInfoFlux - .map(ChatRoomTo::from) - .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); - } - - @Override - public Flux readChatRoomInfo() - { - return Flux - .fromIterable(chatRoomRepository.findAll()) - .map(chatRoomTo -> - { - UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomTo.getName(), - shard); - }); - } - - @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) - { - messageFlux - .map(message -> MessageTo.from(chatRoomId, message)) - .subscribe(messageTo -> messageRepository.save(messageTo)); - } - - @Override - public Flux readChatRoomData(UUID chatRoomId) - { - return Flux - .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) - .map(messageTo -> messageTo.toMessage()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java deleted file mode 100644 index ab24bb8a..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java +++ /dev/null @@ -1,53 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.nostorage; - -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; -import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import reactor.core.publisher.Flux; - -import java.util.UUID; - - -@ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "storage-strategy", - havingValue = "none", - matchIfMissing = true) -@Configuration -@EnableAutoConfiguration( - exclude = { - MongoRepositoriesAutoConfiguration.class, - MongoAutoConfiguration.class }) -public class NoStorageStorageConfiguration -{ - @Bean - public StorageStrategy storageStrategy() - { - return new StorageStrategy() - { - @Override - public void writeChatRoomInfo(Flux chatRoomInfoFlux) {} - - @Override - public Flux readChatRoomInfo() - { - return Flux.empty(); - } - - @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) {} - - @Override - public Flux readChatRoomData(UUID chatRoomId) - { - return Flux.empty(); - } - }; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java new file mode 100644 index 00000000..3a59acb6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.storage.files; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.nio.file.Paths; + + +@ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "storage-strategy", + havingValue = "files") +@Configuration +@EnableAutoConfiguration( + exclude = { + MongoRepositoriesAutoConfiguration.class, + MongoAutoConfiguration.class }) +public class FilesStorageConfiguration +{ + @Bean + public StorageStrategy storageStrategy( + ChatBackendProperties properties, + ShardingStrategy shardingStrategy, + ObjectMapper mapper) + { + return new FilesStorageStrategy( + Paths.get(properties.getInmemory().getStorageDirectory()), + shardingStrategy, + mapper); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java new file mode 100644 index 00000000..9c791977 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -0,0 +1,201 @@ +package de.juplo.kafka.chat.backend.persistence.storage.files; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; + + +@RequiredArgsConstructor +@Slf4j +public class FilesStorageStrategy implements StorageStrategy +{ + public static final String CHATROOMS_FILENAME = "chatrooms.json"; + + + private final Path storagePath; + private final ShardingStrategy shardingStrategy; + private final ObjectMapper mapper; + + + @Override + public void writeChatRoomInfo(Flux chatRoomInfoFlux) + { + Path path = chatroomsPath(); + log.info("Writing chatrooms to {}", path); + try + { + Files.createDirectories(storagePath); + + JsonGenerator generator = + mapper + .getFactory() + .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); + + chatRoomInfoFlux + .log() + .doFirst(() -> + { + try + { + generator.useDefaultPrettyPrinter(); + generator.writeStartArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .doOnTerminate(() -> + { + try + { + generator.writeEndArray(); + generator.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .subscribe(chatRoomInfo -> + { + try + { + ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); + generator.writeObject(chatRoomInfoTo); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + public Flux readChatRoomInfo() + { + JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); + return Flux + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .log() + .map(chatRoomInfoTo -> + { + UUID chatRoomId = chatRoomInfoTo.getId(); + int shard = shardingStrategy.selectShard(chatRoomId); + + log.info( + "{} - old shard: {}, new shard: {}", + chatRoomId, + chatRoomInfoTo.getShard(), + shard); + + return new ChatRoomInfo( + chatRoomId, + chatRoomInfoTo.getName(), + shard); + }); + } + + @Override + public void writeChatRoomData( + UUID chatRoomId, + Flux messageFlux) + { + Path path = chatroomPath(chatRoomId); + log.info("Writing messages for {} to {}", chatRoomId, path); + try + { + Files.createDirectories(storagePath); + + JsonGenerator generator = + mapper + .getFactory() + .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); + + messageFlux + .log() + .doFirst(() -> + { + try + { + generator.useDefaultPrettyPrinter(); + generator.writeStartArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .doOnTerminate(() -> + { + try + { + generator.writeEndArray(); + generator.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .subscribe(message -> + { + try + { + MessageTo messageTo = MessageTo.from(message); + generator.writeObject(messageTo); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + public Flux readChatRoomData(UUID chatRoomId) + { + JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); + return Flux + .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) + .log() + .map(MessageTo::toMessage); + } + + Path chatroomsPath() + { + return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); + } + + Path chatroomPath(UUID id) + { + return storagePath.resolve(Path.of(id.toString() + ".json")); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/JsonFilePublisher.java new file mode 100644 index 00000000..aec8b367 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/JsonFilePublisher.java @@ -0,0 +1,118 @@ +package de.juplo.kafka.chat.backend.persistence.storage.files; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; + + +@RequiredArgsConstructor +@Slf4j +public class JsonFilePublisher implements Publisher +{ + private final Path path; + private final ObjectMapper mapper; + private final JavaType type; + + + @Override + public void subscribe(Subscriber subscriber) + { + log.info("Reading chatrooms from {}", path); + + try + { + JsonParser parser = + mapper.getFactory().createParser(Files.newBufferedReader(path)); + + if (parser.nextToken() != JsonToken.START_ARRAY) + { + throw new IllegalStateException("Expected content to be an array"); + } + + subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser)); + } + catch (NoSuchFileException e) + { + log.info("{} does not exist - starting with empty ChatHome", path); + subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of())); + } + catch (IOException | IllegalStateException e) + { + subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e))))); + } + } + + @RequiredArgsConstructor + private class JsonFileSubscription implements Subscription + { + private final Subscriber subscriber; + private final JsonParser parser; + + @Override + public void request(long requested) + { + try + { + while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY) + { + subscriber.onNext(mapper.readValue(parser, type)); + requested--; + } + + if (requested > 0) + subscriber.onComplete(); + } + catch (IOException e) + { + subscriber.onError(e); + } + } + + @Override + public void cancel() {} + } + + private class ReplaySubscription implements Subscription + { + private final Subscriber subscriber; + private final Iterator>> iterator; + + ReplaySubscription( + Subscriber subscriber, + Iterable>> actions) + { + this.subscriber = subscriber; + this.iterator = actions.iterator(); + } + + @Override + public void request(long requested) + { + while (requested > 0 && iterator.hasNext()) + { + iterator.next().accept(subscriber); + requested--; + } + + if (requested > 0) + subscriber.onComplete(); + } + + @Override + public void cancel() {} + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java new file mode 100644 index 00000000..12e5b968 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java @@ -0,0 +1,8 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import org.springframework.data.mongodb.repository.MongoRepository; + + +public interface ChatRoomRepository extends MongoRepository +{ +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java new file mode 100644 index 00000000..0086053e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.*; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + + +@AllArgsConstructor +@NoArgsConstructor +@Getter(AccessLevel.PACKAGE) +@Setter(AccessLevel.PACKAGE) +@EqualsAndHashCode(of = { "id" }) +@ToString(of = { "id", "shard", "name" }) +@Document +public class ChatRoomTo +{ + @Id + private String id; + private Integer shard; + private String name; + + public static ChatRoomTo from(ChatRoomInfo chatRoomInfo) + { + return new ChatRoomTo( + chatRoomInfo.getId().toString(), + chatRoomInfo.getShard(), + chatRoomInfo.getName()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java new file mode 100644 index 00000000..a429f96e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.List; + + +public interface MessageRepository extends MongoRepository +{ + List findByChatRoomIdOrderBySerialAsc(String chatRoomId); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageTo.java new file mode 100644 index 00000000..f6c6b85d --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageTo.java @@ -0,0 +1,54 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.*; +import org.springframework.data.mongodb.core.index.Indexed; +import org.springframework.data.mongodb.core.mapping.Document; +import org.springframework.data.mongodb.core.mapping.Field; + +import java.time.LocalDateTime; +import java.util.UUID; + + +@AllArgsConstructor +@NoArgsConstructor +@Getter(AccessLevel.PACKAGE) +@Setter(AccessLevel.PACKAGE) +@EqualsAndHashCode(of = { "chatRoomId", "user", "id" }) +@ToString(of = { "chatRoomId", "user", "id" }) +@Document +class MessageTo +{ + @Indexed + private String chatRoomId; + @Indexed + private String user; + @Field("id") + @Indexed + private Long id; + @Indexed + private Long serial; + private String time; + private String text; + + Message toMessage() + { + return new Message( + Message.MessageKey.of(user, id), + serial, + LocalDateTime.parse(time), + text); + } + + static MessageTo from(UUID chatRoomId, Message message) + { + return + new MessageTo( + chatRoomId.toString(), + message.getUsername(), + message.getId(), + message.getSerialNumber(), + message.getTimestamp().toString(), + message.getMessageText()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java new file mode 100644 index 00000000..e6f71490 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java @@ -0,0 +1,28 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "storage-strategy", + havingValue = "mongodb") +@Configuration +public class MongoDbStorageConfiguration +{ + @Bean + public StorageStrategy storageStrategy( + ChatRoomRepository chatRoomRepository, + MessageRepository messageRepository, + ShardingStrategy shardingStrategy) + { + return new MongoDbStorageStrategy( + chatRoomRepository, + messageRepository, + shardingStrategy); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java new file mode 100644 index 00000000..644ab887 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -0,0 +1,69 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; + +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class MongoDbStorageStrategy implements StorageStrategy +{ + private final ChatRoomRepository chatRoomRepository; + private final MessageRepository messageRepository; + private final ShardingStrategy shardingStrategy; + + + @Override + public void writeChatRoomInfo(Flux chatRoomInfoFlux) + { + chatRoomInfoFlux + .map(ChatRoomTo::from) + .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); + } + + @Override + public Flux readChatRoomInfo() + { + return Flux + .fromIterable(chatRoomRepository.findAll()) + .map(chatRoomTo -> + { + UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); + int shard = shardingStrategy.selectShard(chatRoomId); + + log.info( + "{} - old shard: {}, new shard: {}", + chatRoomId, + chatRoomTo.getShard(), + shard); + + return new ChatRoomInfo( + chatRoomId, + chatRoomTo.getName(), + shard); + }); + } + + @Override + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) + { + messageFlux + .map(message -> MessageTo.from(chatRoomId, message)) + .subscribe(messageTo -> messageRepository.save(messageTo)); + } + + @Override + public Flux readChatRoomData(UUID chatRoomId) + { + return Flux + .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) + .map(messageTo -> messageTo.toMessage()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java new file mode 100644 index 00000000..ab24bb8a --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java @@ -0,0 +1,53 @@ +package de.juplo.kafka.chat.backend.persistence.storage.nostorage; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.publisher.Flux; + +import java.util.UUID; + + +@ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "storage-strategy", + havingValue = "none", + matchIfMissing = true) +@Configuration +@EnableAutoConfiguration( + exclude = { + MongoRepositoriesAutoConfiguration.class, + MongoAutoConfiguration.class }) +public class NoStorageStorageConfiguration +{ + @Bean + public StorageStrategy storageStrategy() + { + return new StorageStrategy() + { + @Override + public void writeChatRoomInfo(Flux chatRoomInfoFlux) {} + + @Override + public Flux readChatRoomInfo() + { + return Flux.empty(); + } + + @Override + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) {} + + @Override + public Flux readChatRoomData(UUID chatRoomId) + { + return Flux.empty(); + } + }; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java new file mode 100644 index 00000000..3d311a4e --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java @@ -0,0 +1,36 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHomeService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.time.Clock; + + +@RequiredArgsConstructor +@Slf4j +public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT +{ + final Clock clock; + + @Override + protected StorageStrategyITConfig getConfig() + { + return new StorageStrategyITConfig() + { + int bufferSize = 8; + + SimpleChatHomeService simpleChatHome = new SimpleChatHomeService( + getStorageStrategy(), + clock, + bufferSize); + + @Override + public ChatHomeService getChatHome() + { + return simpleChatHome; + } + }; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java new file mode 100644 index 00000000..c2149bba --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java @@ -0,0 +1,119 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.*; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.UUID; + +import static pl.rzrz.assertj.reactor.Assertions.*; + + +@Slf4j +public abstract class AbstractStorageStrategyIT +{ + protected ChatHomeService chathome; + + + protected abstract StorageStrategy getStorageStrategy(); + protected abstract StorageStrategyITConfig getConfig(); + + protected void start() + { + StorageStrategyITConfig config = getConfig(); + chathome = config.getChatHome(); + } + + protected void stop() + { + getStorageStrategy().write(chathome); + } + + @Test + protected void testStoreAndRecreate() + { + start(); + + assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0); + + UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block(); + log.debug("Created chat-room {}", info); + ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block(); + Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); + Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); + Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); + Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); + + assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info)); + assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info); + assertThat(chathome + .getChatRoomData(chatRoomId) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); + + stop(); + start(); + + assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info)); + assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info); + assertThat(chathome + .getChatRoomData(chatRoomId) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); + } + + @Test + protected void testStoreAndRecreateParallelChatRooms() + { + start(); + + assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0); + + UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block(); + log.debug("Created chat-room {}", infoA); + ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block(); + Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); + Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); + Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); + Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); + + UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); + ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block(); + log.debug("Created chat-room {}", infoB); + ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block(); + Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); + Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); + Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block(); + Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block(); + + assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB)); + assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA); + assertThat(chathome + .getChatRoomData(chatRoomAId) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); + assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB); + assertThat(chathome + .getChatRoomData(chatRoomBId) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); + + stop(); + start(); + + assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB)); + assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA); + assertThat(chathome + .getChatRoomData(chatRoomAId) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); + assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB); + assertThat(chathome + .getChatRoomData(chatRoomBId) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); + } + + + interface StorageStrategyITConfig + { + ChatHomeService getChatHome(); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java new file mode 100644 index 00000000..be40eed2 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java @@ -0,0 +1,71 @@ +package de.juplo.kafka.chat.backend.persistence; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Clock; + + +@Slf4j +public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT +{ + final static Path path = Paths.get("target","files"); + + final ObjectMapper mapper; + final FilesStorageStrategy storageStrategy; + + + public InMemoryWithFilesStorageIT() + { + super(Clock.systemDefaultZone()); + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + storageStrategy = new FilesStorageStrategy( + path, + chatRoomId -> 0, + mapper); + } + + + @Override + protected StorageStrategy getStorageStrategy() + { + return storageStrategy; + } + + @BeforeEach + void reset() throws Exception + { + if (Files.exists(path)) + { + Files + .walk(path) + .forEach(file -> + { + try + { + if (!file.equals(path)) + { + log.debug("Deleting file {}", file); + Files.delete(file); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + log.debug("Deleting data-directory {}", path); + Files.delete(path); + } + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java new file mode 100644 index 00000000..a0dab37c --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java @@ -0,0 +1,107 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer; +import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository; +import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository; +import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.context.support.TestPropertySourceUtils; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Clock; + + +@Testcontainers +@ExtendWith({SpringExtension.class}) +@EnableAutoConfiguration +@AutoConfigureDataMongo +@ContextConfiguration(initializers = DataSourceInitializer.class) +@Slf4j +public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT +{ + @Autowired + MongoDbStorageStrategy storageStrategy; + @Autowired + ChatRoomRepository chatRoomRepository; + @Autowired + MessageRepository messageRepository; + + + public InMemoryWithMongoDbStorageIT() + { + super(Clock.systemDefaultZone()); + } + + + @Override + protected StorageStrategy getStorageStrategy() + { + return storageStrategy; + } + + @TestConfiguration + static class InMemoryWithMongoDbStorageStrategyITConfig + { + @Bean + MongoDbStorageStrategy storageStrategy( + ChatRoomRepository chatRoomRepository, + MessageRepository messageRepository, + Clock clock) + { + return new MongoDbStorageStrategy( + chatRoomRepository, + messageRepository, + chatRoomId -> 0); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + } + + private static final int MONGODB_PORT = 27017; + + @Container + private static final GenericContainer CONTAINER = + new GenericContainer("mongo:6") + .withExposedPorts(MONGODB_PORT); + + public static class DataSourceInitializer + implements ApplicationContextInitializer + { + @Override + public void initialize(ConfigurableApplicationContext applicationContext) + { + TestPropertySourceUtils.addInlinedPropertiesToEnvironment( + applicationContext, + "spring.data.mongodb.host=localhost", + "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT), + "spring.data.mongodb.database=test"); + } + } + + @BeforeEach + void setUpLogging() + { + Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); + CONTAINER.followOutput(logConsumer); + chatRoomRepository.deleteAll(); + messageRepository.deleteAll(); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java deleted file mode 100644 index 3d311a4e..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ /dev/null @@ -1,36 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHomeService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.time.Clock; - - -@RequiredArgsConstructor -@Slf4j -public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT -{ - final Clock clock; - - @Override - protected StorageStrategyITConfig getConfig() - { - return new StorageStrategyITConfig() - { - int bufferSize = 8; - - SimpleChatHomeService simpleChatHome = new SimpleChatHomeService( - getStorageStrategy(), - clock, - bufferSize); - - @Override - public ChatHomeService getChatHome() - { - return simpleChatHome; - } - }; - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java deleted file mode 100644 index c2149bba..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ /dev/null @@ -1,119 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import de.juplo.kafka.chat.backend.domain.*; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Test; - -import java.util.List; -import java.util.UUID; - -import static pl.rzrz.assertj.reactor.Assertions.*; - - -@Slf4j -public abstract class AbstractStorageStrategyIT -{ - protected ChatHomeService chathome; - - - protected abstract StorageStrategy getStorageStrategy(); - protected abstract StorageStrategyITConfig getConfig(); - - protected void start() - { - StorageStrategyITConfig config = getConfig(); - chathome = config.getChatHome(); - } - - protected void stop() - { - getStorageStrategy().write(chathome); - } - - @Test - protected void testStoreAndRecreate() - { - start(); - - assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0); - - UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block(); - log.debug("Created chat-room {}", info); - ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block(); - Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); - Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); - Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); - Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); - - assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info)); - assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info); - assertThat(chathome - .getChatRoomData(chatRoomId) - .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); - - stop(); - start(); - - assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info)); - assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info); - assertThat(chathome - .getChatRoomData(chatRoomId) - .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); - } - - @Test - protected void testStoreAndRecreateParallelChatRooms() - { - start(); - - assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0); - - UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block(); - log.debug("Created chat-room {}", infoA); - ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block(); - Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); - Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); - Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); - Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); - - UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); - ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block(); - log.debug("Created chat-room {}", infoB); - ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block(); - Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); - Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); - Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block(); - Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block(); - - assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB)); - assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA); - assertThat(chathome - .getChatRoomData(chatRoomAId) - .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); - assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB); - assertThat(chathome - .getChatRoomData(chatRoomBId) - .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); - - stop(); - start(); - - assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB)); - assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA); - assertThat(chathome - .getChatRoomData(chatRoomAId) - .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); - assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB); - assertThat(chathome - .getChatRoomData(chatRoomBId) - .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); - } - - - interface StorageStrategyITConfig - { - ChatHomeService getChatHome(); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java deleted file mode 100644 index be40eed2..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ /dev/null @@ -1,71 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeEach; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Clock; - - -@Slf4j -public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT -{ - final static Path path = Paths.get("target","files"); - - final ObjectMapper mapper; - final FilesStorageStrategy storageStrategy; - - - public InMemoryWithFilesStorageIT() - { - super(Clock.systemDefaultZone()); - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - storageStrategy = new FilesStorageStrategy( - path, - chatRoomId -> 0, - mapper); - } - - - @Override - protected StorageStrategy getStorageStrategy() - { - return storageStrategy; - } - - @BeforeEach - void reset() throws Exception - { - if (Files.exists(path)) - { - Files - .walk(path) - .forEach(file -> - { - try - { - if (!file.equals(path)) - { - log.debug("Deleting file {}", file); - Files.delete(file); - } - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }); - log.debug("Deleting data-directory {}", path); - Files.delete(path); - } - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java deleted file mode 100644 index a0dab37c..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ /dev/null @@ -1,107 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer; -import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository; -import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository; -import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.ApplicationContextInitializer; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.test.context.support.TestPropertySourceUtils; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - -import java.time.Clock; - - -@Testcontainers -@ExtendWith({SpringExtension.class}) -@EnableAutoConfiguration -@AutoConfigureDataMongo -@ContextConfiguration(initializers = DataSourceInitializer.class) -@Slf4j -public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT -{ - @Autowired - MongoDbStorageStrategy storageStrategy; - @Autowired - ChatRoomRepository chatRoomRepository; - @Autowired - MessageRepository messageRepository; - - - public InMemoryWithMongoDbStorageIT() - { - super(Clock.systemDefaultZone()); - } - - - @Override - protected StorageStrategy getStorageStrategy() - { - return storageStrategy; - } - - @TestConfiguration - static class InMemoryWithMongoDbStorageStrategyITConfig - { - @Bean - MongoDbStorageStrategy storageStrategy( - ChatRoomRepository chatRoomRepository, - MessageRepository messageRepository, - Clock clock) - { - return new MongoDbStorageStrategy( - chatRoomRepository, - messageRepository, - chatRoomId -> 0); - } - - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - } - - private static final int MONGODB_PORT = 27017; - - @Container - private static final GenericContainer CONTAINER = - new GenericContainer("mongo:6") - .withExposedPorts(MONGODB_PORT); - - public static class DataSourceInitializer - implements ApplicationContextInitializer - { - @Override - public void initialize(ConfigurableApplicationContext applicationContext) - { - TestPropertySourceUtils.addInlinedPropertiesToEnvironment( - applicationContext, - "spring.data.mongodb.host=localhost", - "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT), - "spring.data.mongodb.database=test"); - } - } - - @BeforeEach - void setUpLogging() - { - Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); - CONTAINER.followOutput(logConsumer); - chatRoomRepository.deleteAll(); - messageRepository.deleteAll(); - } -}