From: Kai Moritz Date: Sun, 3 Sep 2023 21:13:25 +0000 (+0200) Subject: WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code X-Git-Tag: rebase--2023-09-05--23-53~23 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=b11c581522746d6cd11e97d79bac890874675304;p=demos%2Fkafka%2Fchat WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java new file mode 100644 index 00000000..d80d5fe4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java @@ -0,0 +1,8 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import org.springframework.data.mongodb.repository.MongoRepository; + + +public interface MessageRepository extends MongoRepository +{ +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 772c6e42..41b1d208 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -17,9 +17,11 @@ import java.util.UUID; @Slf4j public class MongoDbStorageStrategy implements StorageStrategy { - private final ChatRoomRepository repository; + private final ChatRoomRepository chatRoomRepository; + private final MessageRepository messageRepository; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; @@ -28,24 +30,29 @@ public class MongoDbStorageStrategy implements StorageStrategy { chatRoomInfoFlux .map(ChatRoomTo::from) - .subscribe(chatroomTo -> repository.save(chatroomTo)); + .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); } @Override public Flux readChatRoomInfo() { return Flux - .fromIterable(repository.findAll()) + .fromIterable(chatRoomRepository.findAll()) .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - return new ChatRoomData( - clock, - factory.create( - Flux - .fromIterable(chatRoomTo.getMessages()) - .map(messageTo -> messageTo.toMessage())), - bufferSize); + int shard = shardingStrategy.selectShard(chatRoomId); + + log.info( + "{} - old shard: {}, new shard: {}", + chatRoomId, + chatRoomTo.getShard(), + shard); + + return new ChatRoomInfo( + chatRoomId, + chatRoomTo.getName(), + shard); }); } @@ -53,15 +60,15 @@ public class MongoDbStorageStrategy implements StorageStrategy public void writeChatRoomData(Flux chatRoomDataFlux) { chatRoomDataFlux - .map(ChatRoomTo::from) - .subscribe(chatroomTo -> repository.save(chatroomTo)); + .flatMap(ChatRoomTo::from) + .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); } @Override public Flux readChatRoomData() { return Flux - .fromIterable(repository.findAll()) + .fromIterable(chatRoomRepository.findAll()) .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId());