{
Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
- default Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
+ default Mono<Void> write(ChatHomeService chatHomeService)
{
return writeChatRoomInfo(
chatHomeService
.doOnSuccess(emittedChatRoomInfo -> log.info("Stored {}", chatRoomInfo))
.doOnError(throwable -> log.error("Could not store {}: {}", chatRoomInfo, throwable)))
)
- .doOnComplete(() -> log.info("Stored {}", chatHomeService))
+ .then()
+ .doOnSuccess(empty -> log.info("Stored {}", chatHomeService))
.doOnError(throwable -> log.error("Could not store {}: {}", chatHomeService, throwable));
}
import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import java.util.UUID;
@Slf4j
public class NoStorageStorageStrategy implements StorageStrategy
{
- public Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
+ public Mono<Void> write(ChatHomeService chatHomeService)
{
- return Flux
- .<ChatRoomInfo>empty()
- .doOnComplete(() -> log.info("Storage is disabled: Not storing {}", chatHomeService));
+ return Mono
+ .<Void>empty()
+ .doOnSuccess(empty -> log.info("Storage is disabled: Not storing {}", chatHomeService));
}