From: Kai Moritz Date: Tue, 10 Jan 2023 17:34:57 +0000 (+0100) Subject: WIP X-Git-Tag: TEST~8 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=50b9a4965277b3b624b0544e00f6bf185ab126ba;p=demos%2Fkafka%2Fchat WIP --- diff --git a/pom.xml b/pom.xml index 7521e141..4d3d183a 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 3.0.0 + 3.0.1 de.juplo.kafka @@ -40,8 +40,8 @@ true - org.springframework.data - spring-data-mongodb + org.springframework.boot + spring-boot-starter-data-mongodb-reactive org.springframework.boot @@ -59,6 +59,12 @@ mockito-core test + + de.flapdoodle.embed + de.flapdoodle.embed.mongo + 4.3.2 + test + diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index 87965ab9..0cca42d3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -10,6 +10,7 @@ import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories; import java.nio.file.Paths; import java.time.Clock; @@ -17,6 +18,7 @@ import java.time.Clock; @Configuration @EnableConfigurationProperties(ChatBackendProperties.class) +@EnableReactiveMongoRepositories public class ChatBackendConfiguration { @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatHomeRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatHomeRepository.java index cc038c1c..afb30fc4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatHomeRepository.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatHomeRepository.java @@ -1,11 +1,8 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; -import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.data.mongodb.repository.ReactiveMongoRepository; -import java.util.List; - -public interface ChatHomeRepository extends MongoRepository +public interface ChatHomeRepository extends ReactiveMongoRepository { - public List findAll(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java index 701998ef..f4f3ac65 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java @@ -6,7 +6,6 @@ import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.DBRef; import java.util.List; -import java.util.UUID; @AllArgsConstructor @@ -23,7 +22,6 @@ public class ChatRoomTo @DBRef private List messages; - public static ChatRoomTo from(ChatRoom chatroom) { return new ChatRoomTo( diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java index c61a1cc6..8e37166e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java @@ -4,7 +4,6 @@ import de.juplo.kafka.chat.backend.domain.Message; import lombok.*; import java.time.LocalDateTime; -import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index ac249fc7..73f5e8a3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -2,32 +2,45 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; +import lombok.RequiredArgsConstructor; import reactor.core.publisher.Flux; -import java.io.IOException; +import java.time.Clock; +import java.util.UUID; +@RequiredArgsConstructor public class MongoDbStorageStrategy implements StorageStrategy { + private final ChatHomeRepository repository; + private final Clock clock; + private final int bufferSize; + private final ChatRoomServiceFactory factory; + + @Override public void write(Flux chatroomFlux) { chatroomFlux .log() .map(ChatRoomTo::from) - .subscribe(chatroom -> - { - }); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + .subscribe(chatroom -> repository.save(chatroom)); } @Override public Flux read() { - return null; + return repository + .findAll() + .map(chatRoomTo -> new ChatRoom( + UUID.fromString(chatRoomTo.getId()), + chatRoomTo.getName(), + clock, + factory.create( + Flux + .fromIterable(chatRoomTo.getMessages()) + .map(messageTo -> messageTo.toMessage())), + bufferSize)); } }