refactor: `storage` is not a sub-package of `persistence` - Moved classes
authorKai Moritz <kai@juplo.de>
Wed, 6 Sep 2023 21:46:29 +0000 (23:46 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Jan 2024 14:16:22 +0000 (15:16 +0100)
28 files changed:
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/files/JsonFilePublisher.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java
deleted file mode 100644 (file)
index 3a59acb..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.files;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
-import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.nio.file.Paths;
-
-
-@ConditionalOnProperty(
-    prefix = "chat.backend.inmemory",
-    name = "storage-strategy",
-    havingValue = "files")
-@Configuration
-@EnableAutoConfiguration(
-    exclude = {
-        MongoRepositoriesAutoConfiguration.class,
-        MongoAutoConfiguration.class })
-public class FilesStorageConfiguration
-{
-  @Bean
-  public StorageStrategy storageStrategy(
-      ChatBackendProperties properties,
-      ShardingStrategy shardingStrategy,
-      ObjectMapper mapper)
-  {
-    return new FilesStorageStrategy(
-        Paths.get(properties.getInmemory().getStorageDirectory()),
-        shardingStrategy,
-        mapper);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
deleted file mode 100644 (file)
index 9c79197..0000000
+++ /dev/null
@@ -1,201 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.files;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.UUID;
-
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class FilesStorageStrategy implements StorageStrategy
-{
-  public static final String CHATROOMS_FILENAME = "chatrooms.json";
-
-
-  private final Path storagePath;
-  private final ShardingStrategy shardingStrategy;
-  private final ObjectMapper mapper;
-
-
-  @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
-  {
-    Path path = chatroomsPath();
-    log.info("Writing chatrooms to {}", path);
-    try
-    {
-      Files.createDirectories(storagePath);
-
-      JsonGenerator generator =
-          mapper
-              .getFactory()
-              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
-      chatRoomInfoFlux
-          .log()
-          .doFirst(() ->
-          {
-            try
-            {
-              generator.useDefaultPrettyPrinter();
-              generator.writeStartArray();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .doOnTerminate(() ->
-          {
-            try
-            {
-              generator.writeEndArray();
-              generator.close();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .subscribe(chatRoomInfo ->
-          {
-            try
-            {
-              ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
-              generator.writeObject(chatRoomInfoTo);
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          });
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Flux<ChatRoomInfo> readChatRoomInfo()
-  {
-    JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
-    return Flux
-        .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
-        .log()
-        .map(chatRoomInfoTo ->
-        {
-          UUID chatRoomId = chatRoomInfoTo.getId();
-          int shard = shardingStrategy.selectShard(chatRoomId);
-
-          log.info(
-              "{} - old shard: {}, new shard:  {}",
-              chatRoomId,
-              chatRoomInfoTo.getShard(),
-              shard);
-
-          return new ChatRoomInfo(
-              chatRoomId,
-              chatRoomInfoTo.getName(),
-              shard);
-        });
-  }
-
-  @Override
-  public void writeChatRoomData(
-      UUID chatRoomId,
-      Flux<Message> messageFlux)
-  {
-    Path path = chatroomPath(chatRoomId);
-    log.info("Writing messages for {} to {}", chatRoomId, path);
-    try
-    {
-      Files.createDirectories(storagePath);
-
-      JsonGenerator generator =
-          mapper
-              .getFactory()
-              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
-      messageFlux
-          .log()
-          .doFirst(() ->
-          {
-            try
-            {
-              generator.useDefaultPrettyPrinter();
-              generator.writeStartArray();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .doOnTerminate(() ->
-          {
-            try
-            {
-              generator.writeEndArray();
-              generator.close();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .subscribe(message ->
-          {
-            try
-            {
-              MessageTo messageTo = MessageTo.from(message);
-              generator.writeObject(messageTo);
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          });
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Flux<Message> readChatRoomData(UUID chatRoomId)
-  {
-    JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
-    return Flux
-        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
-        .log()
-        .map(MessageTo::toMessage);
-  }
-
-  Path chatroomsPath()
-  {
-    return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
-  }
-
-  Path chatroomPath(UUID id)
-  {
-    return storagePath.resolve(Path.of(id.toString() + ".json"));
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/JsonFilePublisher.java
deleted file mode 100644 (file)
index aec8b36..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.files;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.Consumer;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class JsonFilePublisher<T> implements Publisher<T>
-{
-  private final Path path;
-  private final ObjectMapper mapper;
-  private final JavaType type;
-
-
-  @Override
-  public void subscribe(Subscriber<? super T> subscriber)
-  {
-    log.info("Reading chatrooms from {}", path);
-
-    try
-    {
-      JsonParser parser =
-          mapper.getFactory().createParser(Files.newBufferedReader(path));
-
-      if (parser.nextToken() != JsonToken.START_ARRAY)
-      {
-        throw new IllegalStateException("Expected content to be an array");
-      }
-
-      subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
-    }
-    catch (NoSuchFileException e)
-    {
-      log.info("{} does not exist - starting with empty ChatHome", path);
-      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
-    }
-    catch (IOException | IllegalStateException e)
-    {
-      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
-    }
-  }
-
-  @RequiredArgsConstructor
-  private class JsonFileSubscription implements Subscription
-  {
-    private final Subscriber<? super T> subscriber;
-    private final JsonParser parser;
-
-    @Override
-    public void request(long requested)
-    {
-      try
-      {
-        while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
-        {
-          subscriber.onNext(mapper.readValue(parser, type));
-          requested--;
-        }
-
-        if (requested > 0)
-          subscriber.onComplete();
-      }
-      catch (IOException e)
-      {
-        subscriber.onError(e);
-      }
-    }
-
-    @Override
-    public void cancel() {}
-  }
-
-  private class ReplaySubscription implements Subscription
-  {
-    private final Subscriber<? super T> subscriber;
-    private final Iterator<Consumer<Subscriber<? super T>>> iterator;
-
-    ReplaySubscription(
-        Subscriber<? super T> subscriber,
-        Iterable<Consumer<Subscriber<? super T>>> actions)
-    {
-      this.subscriber = subscriber;
-      this.iterator = actions.iterator();
-    }
-
-    @Override
-    public void request(long requested)
-    {
-      while (requested > 0 && iterator.hasNext())
-      {
-        iterator.next().accept(subscriber);
-        requested--;
-      }
-
-      if (requested > 0)
-        subscriber.onComplete();
-    }
-
-    @Override
-    public void cancel() {}
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java
deleted file mode 100644 (file)
index 12e5b96..0000000
+++ /dev/null
@@ -1,8 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-
-public interface ChatRoomRepository extends MongoRepository<ChatRoomTo, String>
-{
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java
deleted file mode 100644 (file)
index 0086053..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.*;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-
-@AllArgsConstructor
-@NoArgsConstructor
-@Getter(AccessLevel.PACKAGE)
-@Setter(AccessLevel.PACKAGE)
-@EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "shard", "name" })
-@Document
-public class ChatRoomTo
-{
-  @Id
-  private String id;
-  private Integer shard;
-  private String name;
-
-  public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
-  {
-    return new ChatRoomTo(
-        chatRoomInfo.getId().toString(),
-        chatRoomInfo.getShard(),
-        chatRoomInfo.getName());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java
deleted file mode 100644 (file)
index a429f96..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.List;
-
-
-public interface MessageRepository extends MongoRepository<MessageTo, String>
-{
-  List<MessageTo> findByChatRoomIdOrderBySerialAsc(String chatRoomId);
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java
deleted file mode 100644 (file)
index f6c6b85..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.*;
-import org.springframework.data.mongodb.core.index.Indexed;
-import org.springframework.data.mongodb.core.mapping.Document;
-import org.springframework.data.mongodb.core.mapping.Field;
-
-import java.time.LocalDateTime;
-import java.util.UUID;
-
-
-@AllArgsConstructor
-@NoArgsConstructor
-@Getter(AccessLevel.PACKAGE)
-@Setter(AccessLevel.PACKAGE)
-@EqualsAndHashCode(of = { "chatRoomId", "user", "id" })
-@ToString(of = { "chatRoomId", "user", "id" })
-@Document
-class MessageTo
-{
-  @Indexed
-  private String chatRoomId;
-  @Indexed
-  private String user;
-  @Field("id")
-  @Indexed
-  private Long id;
-  @Indexed
-  private Long serial;
-  private String time;
-  private String text;
-
-  Message toMessage()
-  {
-    return new Message(
-        Message.MessageKey.of(user, id),
-        serial,
-        LocalDateTime.parse(time),
-        text);
-  }
-
-  static MessageTo from(UUID chatRoomId, Message message)
-  {
-    return
-        new MessageTo(
-            chatRoomId.toString(),
-            message.getUsername(),
-            message.getId(),
-            message.getSerialNumber(),
-            message.getTimestamp().toString(),
-            message.getMessageText());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java
deleted file mode 100644 (file)
index e6f7149..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-
-@ConditionalOnProperty(
-    prefix = "chat.backend.inmemory",
-    name = "storage-strategy",
-    havingValue = "mongodb")
-@Configuration
-public class MongoDbStorageConfiguration
-{
-  @Bean
-  public StorageStrategy storageStrategy(
-      ChatRoomRepository chatRoomRepository,
-      MessageRepository messageRepository,
-      ShardingStrategy shardingStrategy)
-  {
-    return new MongoDbStorageStrategy(
-        chatRoomRepository,
-        messageRepository,
-        shardingStrategy);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
deleted file mode 100644 (file)
index 644ab88..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class MongoDbStorageStrategy implements StorageStrategy
-{
-  private final ChatRoomRepository chatRoomRepository;
-  private final MessageRepository messageRepository;
-  private final ShardingStrategy shardingStrategy;
-
-
-  @Override
-  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
-  {
-    chatRoomInfoFlux
-        .map(ChatRoomTo::from)
-        .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
-  }
-
-  @Override
-  public Flux<ChatRoomInfo> readChatRoomInfo()
-  {
-    return Flux
-        .fromIterable(chatRoomRepository.findAll())
-        .map(chatRoomTo ->
-        {
-          UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          int shard = shardingStrategy.selectShard(chatRoomId);
-
-          log.info(
-              "{} - old shard: {}, new shard:  {}",
-              chatRoomId,
-              chatRoomTo.getShard(),
-              shard);
-
-          return new ChatRoomInfo(
-              chatRoomId,
-              chatRoomTo.getName(),
-              shard);
-        });
-  }
-
-  @Override
-  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
-  {
-    messageFlux
-        .map(message -> MessageTo.from(chatRoomId, message))
-        .subscribe(messageTo -> messageRepository.save(messageTo));
-  }
-
-  @Override
-  public Flux<Message> readChatRoomData(UUID chatRoomId)
-  {
-    return Flux
-        .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
-        .map(messageTo -> messageTo.toMessage());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java
deleted file mode 100644 (file)
index ab24bb8..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
-import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import reactor.core.publisher.Flux;
-
-import java.util.UUID;
-
-
-@ConditionalOnProperty(
-    prefix = "chat.backend.inmemory",
-    name = "storage-strategy",
-    havingValue = "none",
-    matchIfMissing = true)
-@Configuration
-@EnableAutoConfiguration(
-    exclude = {
-        MongoRepositoriesAutoConfiguration.class,
-        MongoAutoConfiguration.class })
-public class NoStorageStorageConfiguration
-{
-  @Bean
-  public StorageStrategy storageStrategy()
-  {
-    return new StorageStrategy()
-    {
-      @Override
-      public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
-
-      @Override
-      public Flux<ChatRoomInfo> readChatRoomInfo()
-      {
-        return Flux.empty();
-      }
-
-      @Override
-      public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
-
-      @Override
-      public Flux<Message> readChatRoomData(UUID chatRoomId)
-      {
-        return Flux.empty();
-      }
-    };
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java
new file mode 100644 (file)
index 0000000..3a59acb
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.storage.files;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
+import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.nio.file.Paths;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend.inmemory",
+    name = "storage-strategy",
+    havingValue = "files")
+@Configuration
+@EnableAutoConfiguration(
+    exclude = {
+        MongoRepositoriesAutoConfiguration.class,
+        MongoAutoConfiguration.class })
+public class FilesStorageConfiguration
+{
+  @Bean
+  public StorageStrategy storageStrategy(
+      ChatBackendProperties properties,
+      ShardingStrategy shardingStrategy,
+      ObjectMapper mapper)
+  {
+    return new FilesStorageStrategy(
+        Paths.get(properties.getInmemory().getStorageDirectory()),
+        shardingStrategy,
+        mapper);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
new file mode 100644 (file)
index 0000000..9c79197
--- /dev/null
@@ -0,0 +1,201 @@
+package de.juplo.kafka.chat.backend.persistence.storage.files;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.UUID;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class FilesStorageStrategy implements StorageStrategy
+{
+  public static final String CHATROOMS_FILENAME = "chatrooms.json";
+
+
+  private final Path storagePath;
+  private final ShardingStrategy shardingStrategy;
+  private final ObjectMapper mapper;
+
+
+  @Override
+  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  {
+    Path path = chatroomsPath();
+    log.info("Writing chatrooms to {}", path);
+    try
+    {
+      Files.createDirectories(storagePath);
+
+      JsonGenerator generator =
+          mapper
+              .getFactory()
+              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+      chatRoomInfoFlux
+          .log()
+          .doFirst(() ->
+          {
+            try
+            {
+              generator.useDefaultPrettyPrinter();
+              generator.writeStartArray();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .doOnTerminate(() ->
+          {
+            try
+            {
+              generator.writeEndArray();
+              generator.close();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .subscribe(chatRoomInfo ->
+          {
+            try
+            {
+              ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+              generator.writeObject(chatRoomInfoTo);
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> readChatRoomInfo()
+  {
+    JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
+    return Flux
+        .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
+        .log()
+        .map(chatRoomInfoTo ->
+        {
+          UUID chatRoomId = chatRoomInfoTo.getId();
+          int shard = shardingStrategy.selectShard(chatRoomId);
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomInfoTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
+              chatRoomId,
+              chatRoomInfoTo.getName(),
+              shard);
+        });
+  }
+
+  @Override
+  public void writeChatRoomData(
+      UUID chatRoomId,
+      Flux<Message> messageFlux)
+  {
+    Path path = chatroomPath(chatRoomId);
+    log.info("Writing messages for {} to {}", chatRoomId, path);
+    try
+    {
+      Files.createDirectories(storagePath);
+
+      JsonGenerator generator =
+          mapper
+              .getFactory()
+              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+      messageFlux
+          .log()
+          .doFirst(() ->
+          {
+            try
+            {
+              generator.useDefaultPrettyPrinter();
+              generator.writeStartArray();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .doOnTerminate(() ->
+          {
+            try
+            {
+              generator.writeEndArray();
+              generator.close();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .subscribe(message ->
+          {
+            try
+            {
+              MessageTo messageTo = MessageTo.from(message);
+              generator.writeObject(messageTo);
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Flux<Message> readChatRoomData(UUID chatRoomId)
+  {
+    JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+    return Flux
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
+        .log()
+        .map(MessageTo::toMessage);
+  }
+
+  Path chatroomsPath()
+  {
+    return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
+  }
+
+  Path chatroomPath(UUID id)
+  {
+    return storagePath.resolve(Path.of(id.toString() + ".json"));
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/JsonFilePublisher.java
new file mode 100644 (file)
index 0000000..aec8b36
--- /dev/null
@@ -0,0 +1,118 @@
+package de.juplo.kafka.chat.backend.persistence.storage.files;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class JsonFilePublisher<T> implements Publisher<T>
+{
+  private final Path path;
+  private final ObjectMapper mapper;
+  private final JavaType type;
+
+
+  @Override
+  public void subscribe(Subscriber<? super T> subscriber)
+  {
+    log.info("Reading chatrooms from {}", path);
+
+    try
+    {
+      JsonParser parser =
+          mapper.getFactory().createParser(Files.newBufferedReader(path));
+
+      if (parser.nextToken() != JsonToken.START_ARRAY)
+      {
+        throw new IllegalStateException("Expected content to be an array");
+      }
+
+      subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
+    }
+    catch (NoSuchFileException e)
+    {
+      log.info("{} does not exist - starting with empty ChatHome", path);
+      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
+    }
+    catch (IOException | IllegalStateException e)
+    {
+      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
+    }
+  }
+
+  @RequiredArgsConstructor
+  private class JsonFileSubscription implements Subscription
+  {
+    private final Subscriber<? super T> subscriber;
+    private final JsonParser parser;
+
+    @Override
+    public void request(long requested)
+    {
+      try
+      {
+        while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
+        {
+          subscriber.onNext(mapper.readValue(parser, type));
+          requested--;
+        }
+
+        if (requested > 0)
+          subscriber.onComplete();
+      }
+      catch (IOException e)
+      {
+        subscriber.onError(e);
+      }
+    }
+
+    @Override
+    public void cancel() {}
+  }
+
+  private class ReplaySubscription implements Subscription
+  {
+    private final Subscriber<? super T> subscriber;
+    private final Iterator<Consumer<Subscriber<? super T>>> iterator;
+
+    ReplaySubscription(
+        Subscriber<? super T> subscriber,
+        Iterable<Consumer<Subscriber<? super T>>> actions)
+    {
+      this.subscriber = subscriber;
+      this.iterator = actions.iterator();
+    }
+
+    @Override
+    public void request(long requested)
+    {
+      while (requested > 0 && iterator.hasNext())
+      {
+        iterator.next().accept(subscriber);
+        requested--;
+      }
+
+      if (requested > 0)
+        subscriber.onComplete();
+    }
+
+    @Override
+    public void cancel() {}
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java
new file mode 100644 (file)
index 0000000..12e5b96
--- /dev/null
@@ -0,0 +1,8 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+
+public interface ChatRoomRepository extends MongoRepository<ChatRoomTo, String>
+{
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
new file mode 100644 (file)
index 0000000..0086053
--- /dev/null
@@ -0,0 +1,30 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.*;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "shard", "name" })
+@Document
+public class ChatRoomTo
+{
+  @Id
+  private String id;
+  private Integer shard;
+  private String name;
+
+  public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
+  {
+    return new ChatRoomTo(
+        chatRoomInfo.getId().toString(),
+        chatRoomInfo.getShard(),
+        chatRoomInfo.getName());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java
new file mode 100644 (file)
index 0000000..a429f96
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.List;
+
+
+public interface MessageRepository extends MongoRepository<MessageTo, String>
+{
+  List<MessageTo> findByChatRoomIdOrderBySerialAsc(String chatRoomId);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageTo.java
new file mode 100644 (file)
index 0000000..f6c6b85
--- /dev/null
@@ -0,0 +1,54 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+import org.springframework.data.mongodb.core.index.Indexed;
+import org.springframework.data.mongodb.core.mapping.Document;
+import org.springframework.data.mongodb.core.mapping.Field;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "chatRoomId", "user", "id" })
+@ToString(of = { "chatRoomId", "user", "id" })
+@Document
+class MessageTo
+{
+  @Indexed
+  private String chatRoomId;
+  @Indexed
+  private String user;
+  @Field("id")
+  @Indexed
+  private Long id;
+  @Indexed
+  private Long serial;
+  private String time;
+  private String text;
+
+  Message toMessage()
+  {
+    return new Message(
+        Message.MessageKey.of(user, id),
+        serial,
+        LocalDateTime.parse(time),
+        text);
+  }
+
+  static MessageTo from(UUID chatRoomId, Message message)
+  {
+    return
+        new MessageTo(
+            chatRoomId.toString(),
+            message.getUsername(),
+            message.getId(),
+            message.getSerialNumber(),
+            message.getTimestamp().toString(),
+            message.getMessageText());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java
new file mode 100644 (file)
index 0000000..e6f7149
--- /dev/null
@@ -0,0 +1,28 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend.inmemory",
+    name = "storage-strategy",
+    havingValue = "mongodb")
+@Configuration
+public class MongoDbStorageConfiguration
+{
+  @Bean
+  public StorageStrategy storageStrategy(
+      ChatRoomRepository chatRoomRepository,
+      MessageRepository messageRepository,
+      ShardingStrategy shardingStrategy)
+  {
+    return new MongoDbStorageStrategy(
+        chatRoomRepository,
+        messageRepository,
+        shardingStrategy);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
new file mode 100644 (file)
index 0000000..644ab88
--- /dev/null
@@ -0,0 +1,69 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MongoDbStorageStrategy implements StorageStrategy
+{
+  private final ChatRoomRepository chatRoomRepository;
+  private final MessageRepository messageRepository;
+  private final ShardingStrategy shardingStrategy;
+
+
+  @Override
+  public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+  {
+    chatRoomInfoFlux
+        .map(ChatRoomTo::from)
+        .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> readChatRoomInfo()
+  {
+    return Flux
+        .fromIterable(chatRoomRepository.findAll())
+        .map(chatRoomTo ->
+        {
+          UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+          int shard = shardingStrategy.selectShard(chatRoomId);
+
+          log.info(
+              "{} - old shard: {}, new shard:  {}",
+              chatRoomId,
+              chatRoomTo.getShard(),
+              shard);
+
+          return new ChatRoomInfo(
+              chatRoomId,
+              chatRoomTo.getName(),
+              shard);
+        });
+  }
+
+  @Override
+  public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+  {
+    messageFlux
+        .map(message -> MessageTo.from(chatRoomId, message))
+        .subscribe(messageTo -> messageRepository.save(messageTo));
+  }
+
+  @Override
+  public Flux<Message> readChatRoomData(UUID chatRoomId)
+  {
+    return Flux
+        .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+        .map(messageTo -> messageTo.toMessage());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java
new file mode 100644 (file)
index 0000000..ab24bb8
--- /dev/null
@@ -0,0 +1,53 @@
+package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
+import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import reactor.core.publisher.Flux;
+
+import java.util.UUID;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend.inmemory",
+    name = "storage-strategy",
+    havingValue = "none",
+    matchIfMissing = true)
+@Configuration
+@EnableAutoConfiguration(
+    exclude = {
+        MongoRepositoriesAutoConfiguration.class,
+        MongoAutoConfiguration.class })
+public class NoStorageStorageConfiguration
+{
+  @Bean
+  public StorageStrategy storageStrategy()
+  {
+    return new StorageStrategy()
+    {
+      @Override
+      public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+
+      @Override
+      public Flux<ChatRoomInfo> readChatRoomInfo()
+      {
+        return Flux.empty();
+      }
+
+      @Override
+      public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+
+      @Override
+      public Flux<Message> readChatRoomData(UUID chatRoomId)
+      {
+        return Flux.empty();
+      }
+    };
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java
new file mode 100644 (file)
index 0000000..3d311a4
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHomeService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Clock;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT
+{
+  final Clock clock;
+
+  @Override
+  protected StorageStrategyITConfig getConfig()
+  {
+    return new StorageStrategyITConfig()
+    {
+      int bufferSize = 8;
+
+      SimpleChatHomeService simpleChatHome = new SimpleChatHomeService(
+          getStorageStrategy(),
+          clock,
+          bufferSize);
+
+      @Override
+      public ChatHomeService getChatHome()
+      {
+        return simpleChatHome;
+      }
+    };
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
new file mode 100644 (file)
index 0000000..c2149bb
--- /dev/null
@@ -0,0 +1,119 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.*;
+
+
+@Slf4j
+public abstract class AbstractStorageStrategyIT
+{
+  protected ChatHomeService chathome;
+
+
+  protected abstract StorageStrategy getStorageStrategy();
+  protected abstract StorageStrategyITConfig getConfig();
+
+  protected void start()
+  {
+    StorageStrategyITConfig config = getConfig();
+    chathome = config.getChatHome();
+  }
+
+  protected void stop()
+  {
+    getStorageStrategy().write(chathome);
+  }
+
+  @Test
+  protected void testStoreAndRecreate()
+  {
+    start();
+
+    assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
+
+    UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+    ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block();
+    log.debug("Created chat-room {}", info);
+    ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block();
+    Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
+    Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
+    Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
+    Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
+
+    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
+    assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
+    assertThat(chathome
+        .getChatRoomData(chatRoomId)
+        .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
+
+    stop();
+    start();
+
+    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
+    assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
+    assertThat(chathome
+        .getChatRoomData(chatRoomId)
+        .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
+  }
+
+  @Test
+  protected void testStoreAndRecreateParallelChatRooms()
+  {
+    start();
+
+    assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
+
+    UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+    ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block();
+    log.debug("Created chat-room {}", infoA);
+    ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block();
+    Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
+    Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
+    Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
+    Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
+
+    UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
+    ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block();
+    log.debug("Created chat-room {}", infoB);
+    ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block();
+    Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
+    Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
+    Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();
+    Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block();
+
+    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
+    assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
+    assertThat(chathome
+        .getChatRoomData(chatRoomAId)
+        .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
+    assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB);
+    assertThat(chathome
+        .getChatRoomData(chatRoomBId)
+        .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
+
+    stop();
+    start();
+
+    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
+    assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
+    assertThat(chathome
+        .getChatRoomData(chatRoomAId)
+        .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
+    assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB);
+    assertThat(chathome
+        .getChatRoomData(chatRoomBId)
+        .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
+  }
+
+
+  interface StorageStrategyITConfig
+  {
+    ChatHomeService getChatHome();
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java
new file mode 100644 (file)
index 0000000..be40eed
--- /dev/null
@@ -0,0 +1,71 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Clock;
+
+
+@Slf4j
+public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT
+{
+  final static Path path = Paths.get("target","files");
+
+  final ObjectMapper mapper;
+  final FilesStorageStrategy storageStrategy;
+
+
+  public InMemoryWithFilesStorageIT()
+  {
+    super(Clock.systemDefaultZone());
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+    storageStrategy = new FilesStorageStrategy(
+        path,
+        chatRoomId -> 0,
+        mapper);
+  }
+
+
+  @Override
+  protected StorageStrategy getStorageStrategy()
+  {
+    return storageStrategy;
+  }
+
+  @BeforeEach
+  void reset() throws Exception
+  {
+    if (Files.exists(path))
+    {
+      Files
+          .walk(path)
+          .forEach(file ->
+          {
+            try
+            {
+              if (!file.equals(path))
+              {
+                log.debug("Deleting file {}", file);
+                Files.delete(file);
+              }
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          });
+      log.debug("Deleting data-directory {}", path);
+      Files.delete(path);
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java
new file mode 100644 (file)
index 0000000..a0dab37
--- /dev/null
@@ -0,0 +1,107 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.context.support.TestPropertySourceUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.time.Clock;
+
+
+@Testcontainers
+@ExtendWith({SpringExtension.class})
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
+@ContextConfiguration(initializers = DataSourceInitializer.class)
+@Slf4j
+public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT
+{
+  @Autowired
+  MongoDbStorageStrategy storageStrategy;
+  @Autowired
+  ChatRoomRepository chatRoomRepository;
+  @Autowired
+  MessageRepository messageRepository;
+
+
+  public InMemoryWithMongoDbStorageIT()
+  {
+    super(Clock.systemDefaultZone());
+  }
+
+
+  @Override
+  protected StorageStrategy getStorageStrategy()
+  {
+    return storageStrategy;
+  }
+
+  @TestConfiguration
+  static class InMemoryWithMongoDbStorageStrategyITConfig
+  {
+    @Bean
+    MongoDbStorageStrategy storageStrategy(
+        ChatRoomRepository chatRoomRepository,
+        MessageRepository messageRepository,
+        Clock clock)
+    {
+      return new MongoDbStorageStrategy(
+          chatRoomRepository,
+          messageRepository,
+          chatRoomId -> 0);
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+  }
+
+  private static final int MONGODB_PORT = 27017;
+
+  @Container
+  private static final GenericContainer CONTAINER =
+      new GenericContainer("mongo:6")
+          .withExposedPorts(MONGODB_PORT);
+
+  public static class DataSourceInitializer
+      implements ApplicationContextInitializer<ConfigurableApplicationContext>
+  {
+    @Override
+    public void initialize(ConfigurableApplicationContext applicationContext)
+    {
+      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
+          applicationContext,
+          "spring.data.mongodb.host=localhost",
+          "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT),
+          "spring.data.mongodb.database=test");
+    }
+  }
+
+  @BeforeEach
+  void setUpLogging()
+  {
+    Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
+    CONTAINER.followOutput(logConsumer);
+    chatRoomRepository.deleteAll();
+    messageRepository.deleteAll();
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java
deleted file mode 100644 (file)
index 3d311a4..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHomeService;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-import java.time.Clock;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT
-{
-  final Clock clock;
-
-  @Override
-  protected StorageStrategyITConfig getConfig()
-  {
-    return new StorageStrategyITConfig()
-    {
-      int bufferSize = 8;
-
-      SimpleChatHomeService simpleChatHome = new SimpleChatHomeService(
-          getStorageStrategy(),
-          clock,
-          bufferSize);
-
-      @Override
-      public ChatHomeService getChatHome()
-      {
-        return simpleChatHome;
-      }
-    };
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java
deleted file mode 100644 (file)
index c2149bb..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.*;
-
-
-@Slf4j
-public abstract class AbstractStorageStrategyIT
-{
-  protected ChatHomeService chathome;
-
-
-  protected abstract StorageStrategy getStorageStrategy();
-  protected abstract StorageStrategyITConfig getConfig();
-
-  protected void start()
-  {
-    StorageStrategyITConfig config = getConfig();
-    chathome = config.getChatHome();
-  }
-
-  protected void stop()
-  {
-    getStorageStrategy().write(chathome);
-  }
-
-  @Test
-  protected void testStoreAndRecreate()
-  {
-    start();
-
-    assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
-
-    UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
-    ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block();
-    log.debug("Created chat-room {}", info);
-    ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block();
-    Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
-    Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
-    Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
-    Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
-
-    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
-    assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
-    assertThat(chathome
-        .getChatRoomData(chatRoomId)
-        .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
-
-    stop();
-    start();
-
-    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
-    assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
-    assertThat(chathome
-        .getChatRoomData(chatRoomId)
-        .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
-  }
-
-  @Test
-  protected void testStoreAndRecreateParallelChatRooms()
-  {
-    start();
-
-    assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
-
-    UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
-    ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block();
-    log.debug("Created chat-room {}", infoA);
-    ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block();
-    Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
-    Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
-    Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
-    Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
-
-    UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
-    ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block();
-    log.debug("Created chat-room {}", infoB);
-    ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block();
-    Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
-    Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
-    Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();
-    Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block();
-
-    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
-    assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
-    assertThat(chathome
-        .getChatRoomData(chatRoomAId)
-        .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
-    assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB);
-    assertThat(chathome
-        .getChatRoomData(chatRoomBId)
-        .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
-
-    stop();
-    start();
-
-    assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
-    assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
-    assertThat(chathome
-        .getChatRoomData(chatRoomAId)
-        .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
-    assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB);
-    assertThat(chathome
-        .getChatRoomData(chatRoomBId)
-        .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
-  }
-
-
-  interface StorageStrategyITConfig
-  {
-    ChatHomeService getChatHome();
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java
deleted file mode 100644 (file)
index be40eed..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Clock;
-
-
-@Slf4j
-public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT
-{
-  final static Path path = Paths.get("target","files");
-
-  final ObjectMapper mapper;
-  final FilesStorageStrategy storageStrategy;
-
-
-  public InMemoryWithFilesStorageIT()
-  {
-    super(Clock.systemDefaultZone());
-    mapper = new ObjectMapper();
-    mapper.registerModule(new JavaTimeModule());
-    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-    storageStrategy = new FilesStorageStrategy(
-        path,
-        chatRoomId -> 0,
-        mapper);
-  }
-
-
-  @Override
-  protected StorageStrategy getStorageStrategy()
-  {
-    return storageStrategy;
-  }
-
-  @BeforeEach
-  void reset() throws Exception
-  {
-    if (Files.exists(path))
-    {
-      Files
-          .walk(path)
-          .forEach(file ->
-          {
-            try
-            {
-              if (!file.equals(path))
-              {
-                log.debug("Deleting file {}", file);
-                Files.delete(file);
-              }
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          });
-      log.debug("Deleting data-directory {}", path);
-      Files.delete(path);
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java
deleted file mode 100644 (file)
index a0dab37..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer;
-import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
-import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository;
-import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.ApplicationContextInitializer;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.test.context.support.TestPropertySourceUtils;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-
-import java.time.Clock;
-
-
-@Testcontainers
-@ExtendWith({SpringExtension.class})
-@EnableAutoConfiguration
-@AutoConfigureDataMongo
-@ContextConfiguration(initializers = DataSourceInitializer.class)
-@Slf4j
-public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT
-{
-  @Autowired
-  MongoDbStorageStrategy storageStrategy;
-  @Autowired
-  ChatRoomRepository chatRoomRepository;
-  @Autowired
-  MessageRepository messageRepository;
-
-
-  public InMemoryWithMongoDbStorageIT()
-  {
-    super(Clock.systemDefaultZone());
-  }
-
-
-  @Override
-  protected StorageStrategy getStorageStrategy()
-  {
-    return storageStrategy;
-  }
-
-  @TestConfiguration
-  static class InMemoryWithMongoDbStorageStrategyITConfig
-  {
-    @Bean
-    MongoDbStorageStrategy storageStrategy(
-        ChatRoomRepository chatRoomRepository,
-        MessageRepository messageRepository,
-        Clock clock)
-    {
-      return new MongoDbStorageStrategy(
-          chatRoomRepository,
-          messageRepository,
-          chatRoomId -> 0);
-    }
-
-    @Bean
-    Clock clock()
-    {
-      return Clock.systemDefaultZone();
-    }
-  }
-
-  private static final int MONGODB_PORT = 27017;
-
-  @Container
-  private static final GenericContainer CONTAINER =
-      new GenericContainer("mongo:6")
-          .withExposedPorts(MONGODB_PORT);
-
-  public static class DataSourceInitializer
-      implements ApplicationContextInitializer<ConfigurableApplicationContext>
-  {
-    @Override
-    public void initialize(ConfigurableApplicationContext applicationContext)
-    {
-      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
-          applicationContext,
-          "spring.data.mongodb.host=localhost",
-          "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT),
-          "spring.data.mongodb.database=test");
-    }
-  }
-
-  @BeforeEach
-  void setUpLogging()
-  {
-    Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
-    CONTAINER.followOutput(logConsumer);
-    chatRoomRepository.deleteAll();
-    messageRepository.deleteAll();
-  }
-}