<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>3.0.0</version>
+ <version>3.0.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka</groupId>
<properties>
<java.version>17</java.version>
<assertj-reactor.version>1.0.8</assertj-reactor.version>
+ <mongodb.version>4.7.2</mongodb.version>
</properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers-bom</artifactId>
+ <version>1.17.6</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-mongodb</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import java.nio.file.Paths;
import java.time.Clock;
@Configuration
@EnableConfigurationProperties(ChatBackendProperties.class)
+@EnableReactiveMongoRepositories
public class ChatBackendConfiguration
{
@Bean
{
log.debug("Creating InMemoryChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
this.chatrooms = new HashMap<>();
- chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+ chatroomFlux.toStream().forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
this.clock = clock;
this.bufferSize = bufferSize;
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+
+public interface ChatRoomRepository extends MongoRepository<ChatRoomTo, String>
+{
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.*;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.List;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "name" })
+@Document
+public class ChatRoomTo
+{
+ @Id
+ private String id;
+ private String name;
+ private List<MessageTo> messages;
+
+ public static ChatRoomTo from(ChatRoom chatroom)
+ {
+ return new ChatRoomTo(
+ chatroom.getId().toString(),
+ chatroom.getName(),
+ chatroom
+ .getMessages()
+ .map(MessageTo::from)
+ .collectList()
+ .block());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "user", "id" })
+@ToString(of = { "user", "id" })
+class MessageTo
+{
+ final static Pattern SPLIT_ID = Pattern.compile("^([a-z-0-9]+)--([0-9]+)$");
+ private String id;
+ private Long serial;
+ private String time;
+ private String text;
+
+ Message toMessage()
+ {
+ Matcher matcher = SPLIT_ID.matcher(id);
+ if (!matcher.matches())
+ throw new RuntimeException("MessageTo with invalid ID: " + id);
+ Long messageId = Long.parseLong(matcher.group(2));
+ String user = matcher.group(1);
+ return new Message(
+ Message.MessageKey.of(user, messageId),
+ serial,
+ LocalDateTime.parse(time),
+ text);
+ }
+
+ static MessageTo from(Message message)
+ {
+ return
+ new MessageTo(
+ message.getUsername() + "--" + message.getId(),
+ message.getSerialNumber(),
+ message.getTimestamp().toString(),
+ message.getMessageText());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.time.Clock;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MongoDbStorageStrategy implements StorageStrategy
+{
+ private final ChatRoomRepository repository;
+ private final Clock clock;
+ private final int bufferSize;
+ private final ChatRoomServiceFactory factory;
+
+
+ @Override
+ public void write(Flux<ChatRoom> chatroomFlux)
+ {
+ chatroomFlux
+ .map(ChatRoomTo::from)
+ .subscribe(chatroomTo -> repository.save(chatroomTo));
+ }
+
+ @Override
+ public Flux<ChatRoom> read()
+ {
+ return Flux
+ .fromIterable(repository.findAll())
+ .map(chatRoomTo -> new ChatRoom(
+ UUID.fromString(chatRoomTo.getId()),
+ chatRoomTo.getName(),
+ clock,
+ factory.create(
+ Flux
+ .fromIterable(chatRoomTo.getMessages())
+ .map(messageTo -> messageTo.toMessage())),
+ bufferSize));
+ }
+}
protected abstract StorageStrategy getStorageStrategy();
- protected abstract Supplier<ChatHomeService> chatHomeServiceSupplier();
+ protected abstract Supplier<ChatHomeService> getChatHomeServiceSupplier();
protected void start()
{
- chathome = new ChatHome(chatHomeServiceSupplier().get());
+ chathome = new ChatHome(getChatHomeServiceSupplier().get());
}
protected void stop()
assertThat(chathome.getChatRooms().toStream()).hasSize(0);
ChatRoom chatroom = chathome.createChatroom("FOO").block();
- Message m1 = chatroom.addMessage(1l,"Peter", "Hallo, ich heiße Peter!").block();
- Message m2 = chatroom.addMessage(1l, "Ute", "Ich bin Ute...").block();
- Message m3 = chatroom.addMessage(2l, "Peter", "Willst du mit mir gehen?").block();
- Message m4 = chatroom.addMessage(1l, "Klaus", "Ja? Nein? Vielleicht??").block();
+ Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
+ Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
+ Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
+ Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom);
}
@Override
- protected Supplier<ChatHomeService> chatHomeServiceSupplier()
+ protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
{
return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageStrategyIT.DataSourceInitializer;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.context.support.TestPropertySourceUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.time.Clock;
+import java.util.function.Supplier;
+
+
+@Testcontainers
+@ExtendWith({SpringExtension.class})
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
+@ContextConfiguration(initializers = DataSourceInitializer.class)
+@Slf4j
+public class InMemoryWithMongoDbStorageStrategyIT extends AbstractStorageStrategyIT
+{
+ @Autowired
+ MongoDbStorageStrategy storageStrategy;
+ @Autowired
+ Clock clock;
+
+
+ @Override
+ protected StorageStrategy getStorageStrategy()
+ {
+ return storageStrategy;
+ }
+
+ @Override
+ protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
+ {
+ return () -> new InMemoryChatHomeService(getStorageStrategy().read(), clock, 8);
+ }
+
+
+ @TestConfiguration
+ static class InMemoryWithMongoDbStorageStrategyITConfig
+ {
+ @Bean
+ MongoDbStorageStrategy storageStrategy(
+ ChatRoomRepository chatRoomRepository,
+ Clock clock)
+ {
+ return new MongoDbStorageStrategy(
+ chatRoomRepository,
+ clock,
+ 8,
+ messageFlux -> new InMemoryChatRoomService(messageFlux));
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+ }
+
+ private static final int MONGODB_PORT = 27017;
+
+ @Container
+ private static final GenericContainer CONTAINER =
+ new GenericContainer("mongo:6")
+ .withEnv("MONGO_INITDB_DATABASE", "test")
+ .withExposedPorts(MONGODB_PORT);
+
+ public static class DataSourceInitializer
+ implements ApplicationContextInitializer<ConfigurableApplicationContext>
+ {
+ @Override
+ public void initialize(ConfigurableApplicationContext applicationContext)
+ {
+ TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
+ applicationContext,
+ "spring.data.mongodb.host=localhost",
+ "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT),
+ "spring.data.mongodb.database=test");
+
+ Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
+ CONTAINER.followOutput(logConsumer); }
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class MessageToTest
+{
+ @Test
+ void testFrom()
+ {
+ Message message = new Message(
+ Message.MessageKey.of("ute", 1l),
+ 6l,
+ LocalDateTime.now(),
+ "foo");
+ MessageTo messageTo = MessageTo.from(message);
+ assertThat(messageTo.getId()).isEqualTo("ute--1");
+ }
+
+ @Test
+ void testToMessage()
+ {
+ MessageTo messageTo = new MessageTo(
+ "ute--1",
+ 6l,
+ LocalDateTime.now().toString(),
+ "foo");
+ Message message = messageTo.toMessage();
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getUsername()).isEqualTo("ute");
+ }
+}