feat: Implemented and tested `MongoDbStorageStrategy`
authorKai Moritz <kai@juplo.de>
Mon, 9 Jan 2023 21:49:44 +0000 (22:49 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 10 Jan 2023 23:03:43 +0000 (00:03 +0100)
- Beware: The version 4.0.0 of Spring Data MongoDB that is included in
  Spring Boot 3.0.x does not work with version 4.8.1 of the MongoDB drivers
  that are included. Therefore, the version of the drivers was downgraded
  to 4.7.2 - See:
  https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#compatibility.matrix
- Also tried an upgrade of Spring Boot from 3.0.0 to 3.0.1, but without
  luck.
- Beware: Spring Boot 3.x does not include an autoconfigured embedded
  version of MongoDB for testing. It removed the autoconfiguration for
  `de.flapdoodle.embed:de.flapdoodle.embed.mongo` !

pom.xml
src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 36974b7..4869cd5 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
        <parent>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
-               <version>3.0.0</version>
+               <version>3.0.1</version>
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka</groupId>
        <properties>
                <java.version>17</java.version>
                <assertj-reactor.version>1.0.8</assertj-reactor.version>
+               <mongodb.version>4.7.2</mongodb.version>
        </properties>
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>org.testcontainers</groupId>
+                               <artifactId>testcontainers-bom</artifactId>
+                               <version>1.17.6</version>
+                               <type>pom</type>
+                               <scope>import</scope>
+                       </dependency>
+               </dependencies>
+       </dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>lombok</artifactId>
                        <optional>true</optional>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-data-mongodb</artifactId>
+               </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <artifactId>mockito-core</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>testcontainers</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
index 87965ab..0cca42d 100644 (file)
@@ -10,6 +10,7 @@ import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
 
 import java.nio.file.Paths;
 import java.time.Clock;
@@ -17,6 +18,7 @@ import java.time.Clock;
 
 @Configuration
 @EnableConfigurationProperties(ChatBackendProperties.class)
+@EnableReactiveMongoRepositories
 public class ChatBackendConfiguration
 {
   @Bean
index 90044d0..674a84c 100644 (file)
@@ -28,7 +28,7 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
   {
     log.debug("Creating ChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
     this.chatrooms = new HashMap<>();
-    chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+    chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
     this.clock = clock;
     this.bufferSize = bufferSize;
   }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java
new file mode 100644 (file)
index 0000000..12e5b96
--- /dev/null
@@ -0,0 +1,8 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+
+public interface ChatRoomRepository extends MongoRepository<ChatRoomTo, String>
+{
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java
new file mode 100644 (file)
index 0000000..1ad8d17
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.*;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.List;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "name" })
+@Document
+public class ChatRoomTo
+{
+  @Id
+  private String id;
+  private String name;
+  private List<MessageTo> messages;
+
+  public static ChatRoomTo from(ChatRoom chatroom)
+  {
+    return new ChatRoomTo(
+        chatroom.getId().toString(),
+        chatroom.getName(),
+        chatroom
+            .getMessages()
+            .map(MessageTo::from)
+            .collectList()
+            .block());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java
new file mode 100644 (file)
index 0000000..4f93695
--- /dev/null
@@ -0,0 +1,48 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "user", "id" })
+@ToString(of = { "user", "id" })
+class MessageTo
+{
+  final static Pattern SPLIT_ID = Pattern.compile("^([a-z-0-9]+)--([0-9]+)$");
+  private String id;
+  private Long serial;
+  private String time;
+  private String text;
+
+  Message toMessage()
+  {
+    Matcher matcher = SPLIT_ID.matcher(id);
+    if (!matcher.matches())
+      throw new RuntimeException("MessageTo with invalid ID: " + id);
+    Long messageId = Long.parseLong(matcher.group(2));
+    String user = matcher.group(1);
+    return new Message(
+        Message.MessageKey.of(user, messageId),
+        serial,
+        LocalDateTime.parse(time),
+        text);
+  }
+
+  static MessageTo from(Message message)
+  {
+    return
+        new MessageTo(
+             message.getUsername() + "--" + message.getId(),
+            message.getSerialNumber(),
+            message.getTimestamp().toString(),
+            message.getMessageText());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
new file mode 100644 (file)
index 0000000..08ed93b
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.time.Clock;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MongoDbStorageStrategy implements StorageStrategy
+{
+  private final ChatRoomRepository repository;
+  private final Clock clock;
+  private final int bufferSize;
+  private final ChatRoomServiceFactory factory;
+
+
+  @Override
+  public void write(Flux<ChatRoom> chatroomFlux)
+  {
+    chatroomFlux
+        .map(ChatRoomTo::from)
+        .subscribe(chatroomTo -> repository.save(chatroomTo));
+  }
+
+  @Override
+  public Flux<ChatRoom> read()
+  {
+    return Flux
+        .fromIterable(repository.findAll())
+        .map(chatRoomTo -> new ChatRoom(
+            UUID.fromString(chatRoomTo.getId()),
+            chatRoomTo.getName(),
+            clock,
+            factory.create(
+                Flux
+                    .fromIterable(chatRoomTo.getMessages())
+                    .map(messageTo -> messageTo.toMessage())),
+            bufferSize));
+  }
+}
index 549faad..78bdd27 100644 (file)
@@ -40,10 +40,10 @@ public abstract class AbstractStorageStrategyIT
     assertThat(chathome.getChatRooms().toStream()).hasSize(0);
 
     ChatRoom chatroom = chathome.createChatroom("FOO").block();
-    Message m1 = chatroom.addMessage(1l,"Peter", "Hallo, ich heiße Peter!").block();
-    Message m2 = chatroom.addMessage(1l, "Ute", "Ich bin Ute...").block();
-    Message m3 = chatroom.addMessage(2l, "Peter", "Willst du mit mir gehen?").block();
-    Message m4 = chatroom.addMessage(1l, "Klaus", "Ja? Nein? Vielleicht??").block();
+    Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
+    Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
+    Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
+    Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
 
     assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
     assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom);
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java
new file mode 100644 (file)
index 0000000..87296e0
--- /dev/null
@@ -0,0 +1,103 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageStrategyIT.DataSourceInitializer;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.context.support.TestPropertySourceUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.time.Clock;
+import java.util.function.Supplier;
+
+
+@Testcontainers
+@ExtendWith({SpringExtension.class})
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
+@ContextConfiguration(initializers = DataSourceInitializer.class)
+@Slf4j
+public class InMemoryWithMongoDbStorageStrategyIT extends AbstractStorageStrategyIT
+{
+  @Autowired
+  MongoDbStorageStrategy storageStrategy;
+  @Autowired
+  Clock clock;
+
+
+  @Override
+  protected StorageStrategy getStorageStrategy()
+  {
+    return storageStrategy;
+  }
+
+  @Override
+  protected Supplier<ChatHomeService> chatHomeServiceSupplier()
+  {
+    return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
+  }
+
+
+  @TestConfiguration
+  static class InMemoryWithMongoDbStorageStrategyITConfig
+  {
+    @Bean
+    MongoDbStorageStrategy storageStrategy(
+        ChatRoomRepository chatRoomRepository,
+        Clock clock)
+    {
+      return new MongoDbStorageStrategy(
+          chatRoomRepository,
+          clock,
+          8,
+          messageFlux -> new InMemoryChatRoomService(messageFlux));
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+  }
+
+  private static final int MONGODB_PORT = 27017;
+
+  @Container
+  private static final GenericContainer CONTAINER =
+      new GenericContainer("mongo:6")
+          .withEnv("MONGO_INITDB_DATABASE", "test")
+          .withExposedPorts(MONGODB_PORT);
+
+  public static class DataSourceInitializer
+      implements ApplicationContextInitializer<ConfigurableApplicationContext>
+  {
+    @Override
+    public void initialize(ConfigurableApplicationContext applicationContext)
+    {
+      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
+          applicationContext,
+          "spring.data.mongodb.host=localhost",
+          "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT),
+          "spring.data.mongodb.database=test");
+
+      Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
+      CONTAINER.followOutput(logConsumer);    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java
new file mode 100644 (file)
index 0000000..33a8a50
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class MessageToTest
+{
+  @Test
+  void testFrom()
+  {
+    Message message = new Message(
+        Message.MessageKey.of("ute", 1l),
+        6l,
+        LocalDateTime.now(),
+        "foo");
+    MessageTo messageTo = MessageTo.from(message);
+    assertThat(messageTo.getId()).isEqualTo("ute--1");
+  }
+
+  @Test
+  void testToMessage()
+  {
+    MessageTo messageTo = new MessageTo(
+        "ute--1",
+        6l,
+        LocalDateTime.now().toString(),
+        "foo");
+    Message message = messageTo.toMessage();
+    assertThat(message.getId()).isEqualTo(1l);
+    assertThat(message.getUsername()).isEqualTo("ute");
+  }
+}