feat: Implemented and tested `MongoDbStorageStrategy`
authorKai Moritz <kai@juplo.de>
Mon, 9 Jan 2023 21:49:44 +0000 (22:49 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Jan 2023 18:37:53 +0000 (19:37 +0100)
- Beware: The version 4.0.0 of Spring Data MongoDB that is included in
  Spring Boot 3.0.x does not work with version 4.8.1 of the MongoDB drivers
  that are included. Therefore, the version of the drivers was downgraded
  to 4.7.2 - See:
  https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#compatibility.matrix
- Also tried an upgrade of Spring Boot from 3.0.0 to 3.0.1, but without
  luck.
- Beware: Spring Boot 3.x does not include an autoconfigured embedded
  version of MongoDB for testing. It removed the autoconfiguration for
  `de.flapdoodle.embed:de.flapdoodle.embed.mongo` !

pom.xml
src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index fc9e763..d460239 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
-    <version>3.0.0</version>
+    <version>3.0.1</version>
     <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>de.juplo.kafka</groupId>
   <properties>
     <java.version>17</java.version>
     <assertj-reactor.version>1.0.8</assertj-reactor.version>
+    <mongodb.version>4.7.2</mongodb.version>
   </properties>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>testcontainers-bom</artifactId>
+        <version>1.17.6</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
   <dependencies>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>lombok</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-data-mongodb</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>junit-jupiter</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index 87965ab..0cca42d 100644 (file)
@@ -10,6 +10,7 @@ import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
 
 import java.nio.file.Paths;
 import java.time.Clock;
@@ -17,6 +18,7 @@ import java.time.Clock;
 
 @Configuration
 @EnableConfigurationProperties(ChatBackendProperties.class)
+@EnableReactiveMongoRepositories
 public class ChatBackendConfiguration
 {
   @Bean
index 90044d0..674a84c 100644 (file)
@@ -28,7 +28,7 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
   {
     log.debug("Creating ChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
     this.chatrooms = new HashMap<>();
-    chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+    chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
     this.clock = clock;
     this.bufferSize = bufferSize;
   }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java
new file mode 100644 (file)
index 0000000..12e5b96
--- /dev/null
@@ -0,0 +1,8 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+
+public interface ChatRoomRepository extends MongoRepository<ChatRoomTo, String>
+{
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java
new file mode 100644 (file)
index 0000000..1ad8d17
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.*;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.List;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "name" })
+@Document
+public class ChatRoomTo
+{
+  @Id
+  private String id;
+  private String name;
+  private List<MessageTo> messages;
+
+  public static ChatRoomTo from(ChatRoom chatroom)
+  {
+    return new ChatRoomTo(
+        chatroom.getId().toString(),
+        chatroom.getName(),
+        chatroom
+            .getMessages()
+            .map(MessageTo::from)
+            .collectList()
+            .block());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java
new file mode 100644 (file)
index 0000000..4f93695
--- /dev/null
@@ -0,0 +1,48 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "user", "id" })
+@ToString(of = { "user", "id" })
+class MessageTo
+{
+  final static Pattern SPLIT_ID = Pattern.compile("^([a-z-0-9]+)--([0-9]+)$");
+  private String id;
+  private Long serial;
+  private String time;
+  private String text;
+
+  Message toMessage()
+  {
+    Matcher matcher = SPLIT_ID.matcher(id);
+    if (!matcher.matches())
+      throw new RuntimeException("MessageTo with invalid ID: " + id);
+    Long messageId = Long.parseLong(matcher.group(2));
+    String user = matcher.group(1);
+    return new Message(
+        Message.MessageKey.of(user, messageId),
+        serial,
+        LocalDateTime.parse(time),
+        text);
+  }
+
+  static MessageTo from(Message message)
+  {
+    return
+        new MessageTo(
+             message.getUsername() + "--" + message.getId(),
+            message.getSerialNumber(),
+            message.getTimestamp().toString(),
+            message.getMessageText());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
new file mode 100644 (file)
index 0000000..08ed93b
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.time.Clock;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MongoDbStorageStrategy implements StorageStrategy
+{
+  private final ChatRoomRepository repository;
+  private final Clock clock;
+  private final int bufferSize;
+  private final ChatRoomServiceFactory factory;
+
+
+  @Override
+  public void write(Flux<ChatRoom> chatroomFlux)
+  {
+    chatroomFlux
+        .map(ChatRoomTo::from)
+        .subscribe(chatroomTo -> repository.save(chatroomTo));
+  }
+
+  @Override
+  public Flux<ChatRoom> read()
+  {
+    return Flux
+        .fromIterable(repository.findAll())
+        .map(chatRoomTo -> new ChatRoom(
+            UUID.fromString(chatRoomTo.getId()),
+            chatRoomTo.getName(),
+            clock,
+            factory.create(
+                Flux
+                    .fromIterable(chatRoomTo.getMessages())
+                    .map(messageTo -> messageTo.toMessage())),
+            bufferSize));
+  }
+}
index 549faad..dc998ab 100644 (file)
@@ -20,11 +20,11 @@ public abstract class AbstractStorageStrategyIT
 
 
   protected abstract StorageStrategy getStorageStrategy();
-  protected abstract Supplier<ChatHomeService> chatHomeServiceSupplier();
+  protected abstract Supplier<ChatHomeService> getChatHomeServiceSupplier();
 
   protected void start()
   {
-    chathome = new ChatHome(chatHomeServiceSupplier().get());
+    chathome = new ChatHome(getChatHomeServiceSupplier().get());
   }
 
   protected void stop()
@@ -40,10 +40,10 @@ public abstract class AbstractStorageStrategyIT
     assertThat(chathome.getChatRooms().toStream()).hasSize(0);
 
     ChatRoom chatroom = chathome.createChatroom("FOO").block();
-    Message m1 = chatroom.addMessage(1l,"Peter", "Hallo, ich heiße Peter!").block();
-    Message m2 = chatroom.addMessage(1l, "Ute", "Ich bin Ute...").block();
-    Message m3 = chatroom.addMessage(2l, "Peter", "Willst du mit mir gehen?").block();
-    Message m4 = chatroom.addMessage(1l, "Klaus", "Ja? Nein? Vielleicht??").block();
+    Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
+    Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
+    Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
+    Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
 
     assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
     assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom);
index 93bfc8f..5c88f10 100644 (file)
@@ -50,7 +50,7 @@ public class InMemoryWithFilesStorageStrategyIT extends AbstractStorageStrategyI
   }
 
   @Override
-  protected Supplier<ChatHomeService> chatHomeServiceSupplier()
+  protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
     return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
   }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java
new file mode 100644 (file)
index 0000000..d094808
--- /dev/null
@@ -0,0 +1,103 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageStrategyIT.DataSourceInitializer;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.context.support.TestPropertySourceUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.time.Clock;
+import java.util.function.Supplier;
+
+
+@Testcontainers
+@ExtendWith({SpringExtension.class})
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
+@ContextConfiguration(initializers = DataSourceInitializer.class)
+@Slf4j
+public class InMemoryWithMongoDbStorageStrategyIT extends AbstractStorageStrategyIT
+{
+  @Autowired
+  MongoDbStorageStrategy storageStrategy;
+  @Autowired
+  Clock clock;
+
+
+  @Override
+  protected StorageStrategy getStorageStrategy()
+  {
+    return storageStrategy;
+  }
+
+  @Override
+  protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
+  {
+    return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
+  }
+
+
+  @TestConfiguration
+  static class InMemoryWithMongoDbStorageStrategyITConfig
+  {
+    @Bean
+    MongoDbStorageStrategy storageStrategy(
+        ChatRoomRepository chatRoomRepository,
+        Clock clock)
+    {
+      return new MongoDbStorageStrategy(
+          chatRoomRepository,
+          clock,
+          8,
+          messageFlux -> new InMemoryChatRoomService(messageFlux));
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+  }
+
+  private static final int MONGODB_PORT = 27017;
+
+  @Container
+  private static final GenericContainer CONTAINER =
+      new GenericContainer("mongo:6")
+          .withEnv("MONGO_INITDB_DATABASE", "test")
+          .withExposedPorts(MONGODB_PORT);
+
+  public static class DataSourceInitializer
+      implements ApplicationContextInitializer<ConfigurableApplicationContext>
+  {
+    @Override
+    public void initialize(ConfigurableApplicationContext applicationContext)
+    {
+      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
+          applicationContext,
+          "spring.data.mongodb.host=localhost",
+          "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT),
+          "spring.data.mongodb.database=test");
+
+      Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
+      CONTAINER.followOutput(logConsumer);    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java
new file mode 100644 (file)
index 0000000..33a8a50
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class MessageToTest
+{
+  @Test
+  void testFrom()
+  {
+    Message message = new Message(
+        Message.MessageKey.of("ute", 1l),
+        6l,
+        LocalDateTime.now(),
+        "foo");
+    MessageTo messageTo = MessageTo.from(message);
+    assertThat(messageTo.getId()).isEqualTo("ute--1");
+  }
+
+  @Test
+  void testToMessage()
+  {
+    MessageTo messageTo = new MessageTo(
+        "ute--1",
+        6l,
+        LocalDateTime.now().toString(),
+        "foo");
+    Message message = messageTo.toMessage();
+    assertThat(message.getId()).isEqualTo(1l);
+    assertThat(message.getUsername()).isEqualTo("ute");
+  }
+}