<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>3.0.0</version>
+ <version>3.0.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka</groupId>
<optional>true</optional>
</dependency>
<dependency>
- <groupId>org.springframework.data</groupId>
- <artifactId>spring-data-mongodb</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>4.3.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import java.nio.file.Paths;
import java.time.Clock;
@Configuration
@EnableConfigurationProperties(ChatBackendProperties.class)
+@EnableReactiveMongoRepositories
public class ChatBackendConfiguration
{
@Bean
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-import org.springframework.data.mongodb.repository.MongoRepository;
+import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
-import java.util.List;
-
-public interface ChatHomeRepository extends MongoRepository<ChatRoomTo, String>
+public interface ChatHomeRepository extends ReactiveMongoRepository<ChatRoomTo, String>
{
- public List<ChatRoomTo> findAll();
}
import org.springframework.data.mongodb.core.mapping.DBRef;
import java.util.List;
-import java.util.UUID;
@AllArgsConstructor
@DBRef
private List<MessageTo> messages;
-
public static ChatRoomTo from(ChatRoom chatroom)
{
return new ChatRoomTo(
import lombok.*;
import java.time.LocalDateTime;
-import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
+import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Flux;
-import java.io.IOException;
+import java.time.Clock;
+import java.util.UUID;
+@RequiredArgsConstructor
public class MongoDbStorageStrategy implements StorageStrategy
{
+ private final ChatHomeRepository repository;
+ private final Clock clock;
+ private final int bufferSize;
+ private final ChatRoomServiceFactory factory;
+
+
@Override
public void write(Flux<ChatRoom> chatroomFlux)
{
chatroomFlux
.log()
.map(ChatRoomTo::from)
- .subscribe(chatroom ->
- {
- });
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ .subscribe(chatroom -> repository.save(chatroom));
}
@Override
public Flux<ChatRoom> read()
{
- return null;
+ return repository
+ .findAll()
+ .map(chatRoomTo -> new ChatRoom(
+ UUID.fromString(chatRoomTo.getId()),
+ chatRoomTo.getName(),
+ clock,
+ factory.create(
+ Flux
+ .fromIterable(chatRoomTo.getMessages())
+ .map(messageTo -> messageTo.toMessage())),
+ bufferSize));
}
}