throw new InvalidUsernameException(user);
Message.MessageKey key = Message.MessageKey.of(user, id);
- return service
- .getMessage(key)
- .flatMap(existing -> text.equals(existing.getMessageText())
- ? Mono.just(existing)
- : service
- .persistMessage(key, LocalDateTime.now(clock), text)
- .doOnNext(m ->
- {
- Sinks.EmitResult result = sink.tryEmitNext(m);
- if (result.isFailure())
- {
- log.warn("Emitting of message failed with {} for {}", result.name(), m);
- }
- }));
+ Mono<Message> mono = service
+ .getMessage(key)
+ .handle((existing, sink) ->
+ {
+ if (existing.getMessageText().equals(text))
+ {
+ sink.next(existing);
+ }
+ else
+ {
+ sink.error(new MessageMutationException(existing, text));
+ }
+ });
+
+ return mono.<Mono<Message>>switchIfEmpty(service
+ .persistMessage(key, LocalDateTime.now(clock), text)
+ .<Mono<Message>>doOnNext(m ->
+ {
+ Sinks.EmitResult result = sink.tryEmitNext(m);
+ if (result.isFailure())
+ {
+ log.warn("Emitting of message failed with {} for {}", result.name(), m);
+ }
+ }));
}