]> juplo.de Git - demos/kafka/chat/commitdiff
feat: Implemented and tested `MongoDbStorageStrategy`
authorKai Moritz <kai@juplo.de>
Mon, 9 Jan 2023 21:49:44 +0000 (22:49 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Jan 2023 18:37:53 +0000 (19:37 +0100)
- Beware: The version 4.0.0 of Spring Data MongoDB that is included in
  Spring Boot 3.0.x does not work with version 4.8.1 of the MongoDB drivers
  that are included. Therefore, the version of the drivers was downgraded
  to 4.7.2 - See:
  https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#compatibility.matrix
- Also tried an upgrade of Spring Boot from 3.0.0 to 3.0.1, but without
  luck.
- Beware: Spring Boot 3.x does not include an autoconfigured embedded
  version of MongoDB for testing. It removed the autoconfiguration for
  `de.flapdoodle.embed:de.flapdoodle.embed.mongo` !

pom.xml
src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index fc9e763394906d26adf7d3196e2001ad6858e7bd..d460239e9621e676722963b7ce276c1d5c0d0681 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
-    <version>3.0.0</version>
+    <version>3.0.1</version>
     <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>de.juplo.kafka</groupId>
   <properties>
     <java.version>17</java.version>
     <assertj-reactor.version>1.0.8</assertj-reactor.version>
+    <mongodb.version>4.7.2</mongodb.version>
   </properties>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>testcontainers-bom</artifactId>
+        <version>1.17.6</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
   <dependencies>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>lombok</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-data-mongodb</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>junit-jupiter</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index 87965ab920bbec454adaa7e9485a95a1022493c6..0cca42d3ac76e5f67627e79bd3991b51e3083302 100644 (file)
@@ -10,6 +10,7 @@ import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
 
 import java.nio.file.Paths;
 import java.time.Clock;
@@ -17,6 +18,7 @@ import java.time.Clock;
 
 @Configuration
 @EnableConfigurationProperties(ChatBackendProperties.class)
+@EnableReactiveMongoRepositories
 public class ChatBackendConfiguration
 {
   @Bean
index 90044d0f4fda6c5cfd2366e921319999e8a2f66b..674a84c0d28a3f1aa2df619a4f86b144e23aa908 100644 (file)
@@ -28,7 +28,7 @@ public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoom
   {
     log.debug("Creating ChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
     this.chatrooms = new HashMap<>();
-    chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+    chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
     this.clock = clock;
     this.bufferSize = bufferSize;
   }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java
new file mode 100644 (file)
index 0000000..12e5b96
--- /dev/null
@@ -0,0 +1,8 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+
+public interface ChatRoomRepository extends MongoRepository<ChatRoomTo, String>
+{
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java
new file mode 100644 (file)
index 0000000..1ad8d17
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.*;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.List;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "name" })
+@Document
+public class ChatRoomTo
+{
+  @Id
+  private String id;
+  private String name;
+  private List<MessageTo> messages;
+
+  public static ChatRoomTo from(ChatRoom chatroom)
+  {
+    return new ChatRoomTo(
+        chatroom.getId().toString(),
+        chatroom.getName(),
+        chatroom
+            .getMessages()
+            .map(MessageTo::from)
+            .collectList()
+            .block());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java
new file mode 100644 (file)
index 0000000..4f93695
--- /dev/null
@@ -0,0 +1,48 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "user", "id" })
+@ToString(of = { "user", "id" })
+class MessageTo
+{
+  final static Pattern SPLIT_ID = Pattern.compile("^([a-z-0-9]+)--([0-9]+)$");
+  private String id;
+  private Long serial;
+  private String time;
+  private String text;
+
+  Message toMessage()
+  {
+    Matcher matcher = SPLIT_ID.matcher(id);
+    if (!matcher.matches())
+      throw new RuntimeException("MessageTo with invalid ID: " + id);
+    Long messageId = Long.parseLong(matcher.group(2));
+    String user = matcher.group(1);
+    return new Message(
+        Message.MessageKey.of(user, messageId),
+        serial,
+        LocalDateTime.parse(time),
+        text);
+  }
+
+  static MessageTo from(Message message)
+  {
+    return
+        new MessageTo(
+             message.getUsername() + "--" + message.getId(),
+            message.getSerialNumber(),
+            message.getTimestamp().toString(),
+            message.getMessageText());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
new file mode 100644 (file)
index 0000000..08ed93b
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.time.Clock;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MongoDbStorageStrategy implements StorageStrategy
+{
+  private final ChatRoomRepository repository;
+  private final Clock clock;
+  private final int bufferSize;
+  private final ChatRoomServiceFactory factory;
+
+
+  @Override
+  public void write(Flux<ChatRoom> chatroomFlux)
+  {
+    chatroomFlux
+        .map(ChatRoomTo::from)
+        .subscribe(chatroomTo -> repository.save(chatroomTo));
+  }
+
+  @Override
+  public Flux<ChatRoom> read()
+  {
+    return Flux
+        .fromIterable(repository.findAll())
+        .map(chatRoomTo -> new ChatRoom(
+            UUID.fromString(chatRoomTo.getId()),
+            chatRoomTo.getName(),
+            clock,
+            factory.create(
+                Flux
+                    .fromIterable(chatRoomTo.getMessages())
+                    .map(messageTo -> messageTo.toMessage())),
+            bufferSize));
+  }
+}
index 549faada760b41421321616db423523ead66b002..dc998ab4f52ae8c631a01a2cc3d4fb67d17f66a8 100644 (file)
@@ -20,11 +20,11 @@ public abstract class AbstractStorageStrategyIT
 
 
   protected abstract StorageStrategy getStorageStrategy();
-  protected abstract Supplier<ChatHomeService> chatHomeServiceSupplier();
+  protected abstract Supplier<ChatHomeService> getChatHomeServiceSupplier();
 
   protected void start()
   {
-    chathome = new ChatHome(chatHomeServiceSupplier().get());
+    chathome = new ChatHome(getChatHomeServiceSupplier().get());
   }
 
   protected void stop()
@@ -40,10 +40,10 @@ public abstract class AbstractStorageStrategyIT
     assertThat(chathome.getChatRooms().toStream()).hasSize(0);
 
     ChatRoom chatroom = chathome.createChatroom("FOO").block();
-    Message m1 = chatroom.addMessage(1l,"Peter", "Hallo, ich heiße Peter!").block();
-    Message m2 = chatroom.addMessage(1l, "Ute", "Ich bin Ute...").block();
-    Message m3 = chatroom.addMessage(2l, "Peter", "Willst du mit mir gehen?").block();
-    Message m4 = chatroom.addMessage(1l, "Klaus", "Ja? Nein? Vielleicht??").block();
+    Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
+    Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
+    Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
+    Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
 
     assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
     assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom);
index 93bfc8ff3cc79f8b2acd3df0c9cf258370805f86..5c88f10ef5f447ddd35b9647ff09771a480b465c 100644 (file)
@@ -50,7 +50,7 @@ public class InMemoryWithFilesStorageStrategyIT extends AbstractStorageStrategyI
   }
 
   @Override
-  protected Supplier<ChatHomeService> chatHomeServiceSupplier()
+  protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
   {
     return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
   }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java
new file mode 100644 (file)
index 0000000..d094808
--- /dev/null
@@ -0,0 +1,103 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageStrategyIT.DataSourceInitializer;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.context.support.TestPropertySourceUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.time.Clock;
+import java.util.function.Supplier;
+
+
+@Testcontainers
+@ExtendWith({SpringExtension.class})
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
+@ContextConfiguration(initializers = DataSourceInitializer.class)
+@Slf4j
+public class InMemoryWithMongoDbStorageStrategyIT extends AbstractStorageStrategyIT
+{
+  @Autowired
+  MongoDbStorageStrategy storageStrategy;
+  @Autowired
+  Clock clock;
+
+
+  @Override
+  protected StorageStrategy getStorageStrategy()
+  {
+    return storageStrategy;
+  }
+
+  @Override
+  protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
+  {
+    return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
+  }
+
+
+  @TestConfiguration
+  static class InMemoryWithMongoDbStorageStrategyITConfig
+  {
+    @Bean
+    MongoDbStorageStrategy storageStrategy(
+        ChatRoomRepository chatRoomRepository,
+        Clock clock)
+    {
+      return new MongoDbStorageStrategy(
+          chatRoomRepository,
+          clock,
+          8,
+          messageFlux -> new InMemoryChatRoomService(messageFlux));
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+  }
+
+  private static final int MONGODB_PORT = 27017;
+
+  @Container
+  private static final GenericContainer CONTAINER =
+      new GenericContainer("mongo:6")
+          .withEnv("MONGO_INITDB_DATABASE", "test")
+          .withExposedPorts(MONGODB_PORT);
+
+  public static class DataSourceInitializer
+      implements ApplicationContextInitializer<ConfigurableApplicationContext>
+  {
+    @Override
+    public void initialize(ConfigurableApplicationContext applicationContext)
+    {
+      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
+          applicationContext,
+          "spring.data.mongodb.host=localhost",
+          "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT),
+          "spring.data.mongodb.database=test");
+
+      Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
+      CONTAINER.followOutput(logConsumer);    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java
new file mode 100644 (file)
index 0000000..33a8a50
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class MessageToTest
+{
+  @Test
+  void testFrom()
+  {
+    Message message = new Message(
+        Message.MessageKey.of("ute", 1l),
+        6l,
+        LocalDateTime.now(),
+        "foo");
+    MessageTo messageTo = MessageTo.from(message);
+    assertThat(messageTo.getId()).isEqualTo("ute--1");
+  }
+
+  @Test
+  void testToMessage()
+  {
+    MessageTo messageTo = new MessageTo(
+        "ute--1",
+        6l,
+        LocalDateTime.now().toString(),
+        "foo");
+    Message message = messageTo.toMessage();
+    assertThat(message.getId()).isEqualTo(1l);
+    assertThat(message.getUsername()).isEqualTo("ute");
+  }
+}