From: Kai Moritz Date: Mon, 9 Jan 2023 21:49:44 +0000 (+0100) Subject: feat: Implemented and tested `MongoDbStorageStrategy` X-Git-Tag: wip-sharding~43 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=009435373fa6adf7ec9e375ef71a545c8c8f010d;p=demos%2Fkafka%2Fchat feat: Implemented and tested `MongoDbStorageStrategy` - Beware: The version 4.0.0 of Spring Data MongoDB that is included in Spring Boot 3.0.x does not work with version 4.8.1 of the MongoDB drivers that are included. Therefore, the version of the drivers was downgraded to 4.7.2 - See: https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#compatibility.matrix - Also tried an upgrade of Spring Boot from 3.0.0 to 3.0.1, but without luck. - Beware: Spring Boot 3.x does not include an autoconfigured embedded version of MongoDB for testing. It removed the autoconfiguration for `de.flapdoodle.embed:de.flapdoodle.embed.mongo` ! --- diff --git a/pom.xml b/pom.xml index fc9e7633..d460239e 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 3.0.0 + 3.0.1 de.juplo.kafka @@ -16,7 +16,19 @@ 17 1.0.8 + 4.7.2 + + + + org.testcontainers + testcontainers-bom + 1.17.6 + pom + import + + + org.springframework.boot @@ -39,6 +51,10 @@ lombok true + + org.springframework.boot + spring-boot-starter-data-mongodb + org.springframework.boot spring-boot-starter-test @@ -55,6 +71,16 @@ mockito-core test + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index 87965ab9..0cca42d3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -10,6 +10,7 @@ import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories; import java.nio.file.Paths; import java.time.Clock; @@ -17,6 +18,7 @@ import java.time.Clock; @Configuration @EnableConfigurationProperties(ChatBackendProperties.class) +@EnableReactiveMongoRepositories public class ChatBackendConfiguration { @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 4eea6451..96515bfc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -28,7 +28,7 @@ public class InMemoryChatHomeService implements ChatHomeService(); - chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); + chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); this.clock = clock; this.bufferSize = bufferSize; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java new file mode 100644 index 00000000..12e5b968 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomRepository.java @@ -0,0 +1,8 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import org.springframework.data.mongodb.repository.MongoRepository; + + +public interface ChatRoomRepository extends MongoRepository +{ +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java new file mode 100644 index 00000000..1ad8d178 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java @@ -0,0 +1,36 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import lombok.*; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.List; + + +@AllArgsConstructor +@NoArgsConstructor +@Getter(AccessLevel.PACKAGE) +@Setter(AccessLevel.PACKAGE) +@EqualsAndHashCode(of = { "id" }) +@ToString(of = { "id", "name" }) +@Document +public class ChatRoomTo +{ + @Id + private String id; + private String name; + private List messages; + + public static ChatRoomTo from(ChatRoom chatroom) + { + return new ChatRoomTo( + chatroom.getId().toString(), + chatroom.getName(), + chatroom + .getMessages() + .map(MessageTo::from) + .collectList() + .block()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java new file mode 100644 index 00000000..4f93695c --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java @@ -0,0 +1,48 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.*; + +import java.time.LocalDateTime; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +@AllArgsConstructor +@NoArgsConstructor +@Getter(AccessLevel.PACKAGE) +@Setter(AccessLevel.PACKAGE) +@EqualsAndHashCode(of = { "user", "id" }) +@ToString(of = { "user", "id" }) +class MessageTo +{ + final static Pattern SPLIT_ID = Pattern.compile("^([a-z-0-9]+)--([0-9]+)$"); + private String id; + private Long serial; + private String time; + private String text; + + Message toMessage() + { + Matcher matcher = SPLIT_ID.matcher(id); + if (!matcher.matches()) + throw new RuntimeException("MessageTo with invalid ID: " + id); + Long messageId = Long.parseLong(matcher.group(2)); + String user = matcher.group(1); + return new Message( + Message.MessageKey.of(user, messageId), + serial, + LocalDateTime.parse(time), + text); + } + + static MessageTo from(Message message) + { + return + new MessageTo( + message.getUsername() + "--" + message.getId(), + message.getSerialNumber(), + message.getTimestamp().toString(), + message.getMessageText()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java new file mode 100644 index 00000000..08ed93b0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -0,0 +1,47 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; + +import java.time.Clock; +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class MongoDbStorageStrategy implements StorageStrategy +{ + private final ChatRoomRepository repository; + private final Clock clock; + private final int bufferSize; + private final ChatRoomServiceFactory factory; + + + @Override + public void write(Flux chatroomFlux) + { + chatroomFlux + .map(ChatRoomTo::from) + .subscribe(chatroomTo -> repository.save(chatroomTo)); + } + + @Override + public Flux read() + { + return Flux + .fromIterable(repository.findAll()) + .map(chatRoomTo -> new ChatRoom( + UUID.fromString(chatRoomTo.getId()), + chatRoomTo.getName(), + clock, + factory.create( + Flux + .fromIterable(chatRoomTo.getMessages()) + .map(messageTo -> messageTo.toMessage())), + bufferSize)); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 549faada..dc998ab4 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -20,11 +20,11 @@ public abstract class AbstractStorageStrategyIT protected abstract StorageStrategy getStorageStrategy(); - protected abstract Supplier chatHomeServiceSupplier(); + protected abstract Supplier getChatHomeServiceSupplier(); protected void start() { - chathome = new ChatHome(chatHomeServiceSupplier().get()); + chathome = new ChatHome(getChatHomeServiceSupplier().get()); } protected void stop() @@ -40,10 +40,10 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); ChatRoom chatroom = chathome.createChatroom("FOO").block(); - Message m1 = chatroom.addMessage(1l,"Peter", "Hallo, ich heiße Peter!").block(); - Message m2 = chatroom.addMessage(1l, "Ute", "Ich bin Ute...").block(); - Message m3 = chatroom.addMessage(2l, "Peter", "Willst du mit mir gehen?").block(); - Message m4 = chatroom.addMessage(1l, "Klaus", "Ja? Nein? Vielleicht??").block(); + Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); + Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); + Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); + Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom)); assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java index 93bfc8ff..5c88f10e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageStrategyIT.java @@ -50,7 +50,7 @@ public class InMemoryWithFilesStorageStrategyIT extends AbstractStorageStrategyI } @Override - protected Supplier chatHomeServiceSupplier() + protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java new file mode 100644 index 00000000..d0948088 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageStrategyIT.java @@ -0,0 +1,103 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; +import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageStrategyIT.DataSourceInitializer; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; +import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository; +import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.context.support.TestPropertySourceUtils; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Clock; +import java.util.function.Supplier; + + +@Testcontainers +@ExtendWith({SpringExtension.class}) +@EnableAutoConfiguration +@AutoConfigureDataMongo +@ContextConfiguration(initializers = DataSourceInitializer.class) +@Slf4j +public class InMemoryWithMongoDbStorageStrategyIT extends AbstractStorageStrategyIT +{ + @Autowired + MongoDbStorageStrategy storageStrategy; + @Autowired + Clock clock; + + + @Override + protected StorageStrategy getStorageStrategy() + { + return storageStrategy; + } + + @Override + protected Supplier getChatHomeServiceSupplier() + { + return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8); + } + + + @TestConfiguration + static class InMemoryWithMongoDbStorageStrategyITConfig + { + @Bean + MongoDbStorageStrategy storageStrategy( + ChatRoomRepository chatRoomRepository, + Clock clock) + { + return new MongoDbStorageStrategy( + chatRoomRepository, + clock, + 8, + messageFlux -> new InMemoryChatRoomService(messageFlux)); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + } + + private static final int MONGODB_PORT = 27017; + + @Container + private static final GenericContainer CONTAINER = + new GenericContainer("mongo:6") + .withEnv("MONGO_INITDB_DATABASE", "test") + .withExposedPorts(MONGODB_PORT); + + public static class DataSourceInitializer + implements ApplicationContextInitializer + { + @Override + public void initialize(ConfigurableApplicationContext applicationContext) + { + TestPropertySourceUtils.addInlinedPropertiesToEnvironment( + applicationContext, + "spring.data.mongodb.host=localhost", + "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT), + "spring.data.mongodb.database=test"); + + Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); + CONTAINER.followOutput(logConsumer); } + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java new file mode 100644 index 00000000..33a8a503 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java @@ -0,0 +1,37 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.Message; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; + +import static org.assertj.core.api.Assertions.assertThat; + + +public class MessageToTest +{ + @Test + void testFrom() + { + Message message = new Message( + Message.MessageKey.of("ute", 1l), + 6l, + LocalDateTime.now(), + "foo"); + MessageTo messageTo = MessageTo.from(message); + assertThat(messageTo.getId()).isEqualTo("ute--1"); + } + + @Test + void testToMessage() + { + MessageTo messageTo = new MessageTo( + "ute--1", + 6l, + LocalDateTime.now().toString(), + "foo"); + Message message = messageTo.toMessage(); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getUsername()).isEqualTo("ute"); + } +}