X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fstorage%2Fmongodb%2FMongoDbStorageStrategy.java;h=1d742a5d06694be65045268d90e476b6cdbc0739;hb=46cafb65876ccec33ef4e9948fad2e4aa526039a;hp=c87036c9a1732cb57c3a7a03cf9b9bba7d6c6042;hpb=73fe0a2d380cd9ed8b2da561ae7531d90c0ef9c5;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index c87036c9..1d742a5d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -25,29 +25,17 @@ public class MongoDbStorageStrategy implements StorageStrategy { chatRoomInfoFlux .map(ChatRoomTo::from) - .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); + .flatMap(chatroomTo -> chatRoomRepository.save(chatroomTo)); } @Override public Flux readChatRoomInfo() { - return Flux - .fromIterable(chatRoomRepository.findAll()) + return chatRoomRepository.findAll() .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomTo.getName(), - shard); + return new ChatRoomInfo(chatRoomId, chatRoomTo.getName(), null); }); } @@ -56,14 +44,14 @@ public class MongoDbStorageStrategy implements StorageStrategy { messageFlux .map(message -> MessageTo.from(chatRoomId, message)) - .subscribe(messageTo -> messageRepository.save(messageTo)); + .flatMap(messageTo -> messageRepository.save(messageTo)) + .subscribe(); } @Override public Flux readChatRoomData(UUID chatRoomId) { - return Flux - .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) + return messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()) .map(messageTo -> messageTo.toMessage()); } }