import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Clock;
@RequiredArgsConstructor
+@Slf4j
public class MongoDbStorageStrategy implements StorageStrategy
{
private final ChatHomeRepository repository;
chatroomFlux
.log()
.map(ChatRoomTo::from)
- .subscribe(chatroom -> repository.save(chatroom));
+ .flatMap(chatroom -> repository.save(chatroom))
+ .doOnNext(chatRoomTo -> log.debug("Written: {}", chatRoomTo))
+ .blockLast();
}
@Override