X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Fmongodb%2FMongoDbStorageStrategy.java;h=972122e0e6efcee0b4cc021299e0fd72f2d95b84;hb=9fbb1a24b8c62619f8e51c5575b70b66fcd99ff8;hp=5f46e3c42e0ac22e68d51a0c5cc3d2c6f6180323;hpb=992de79b80f78c6a57af4711d2a200073e8ca437;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 5f46e3c4..972122e0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -2,8 +2,8 @@ package de.juplo.kafka.chat.backend.storage.mongodb; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.implementation.ShardingStrategy; +import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -36,26 +36,21 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomTo.getName(), - shard); + return new ChatRoomInfo(chatRoomId, chatRoomTo.getName(), null); }); } @Override - public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) + public void writeChatRoomData( + UUID chatRoomId, + Flux messageFlux, + SuccessCallback successCallback, + FailureCallback failureCallback) { messageFlux .map(message -> MessageTo.from(chatRoomId, message)) + .doOnComplete(() -> successCallback.accept(chatRoomId)) + .doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)) .subscribe(messageTo -> messageRepository.save(messageTo)); }