From: Kai Moritz Date: Wed, 25 Jan 2023 17:47:10 +0000 (+0100) Subject: WIP:refactor X-Git-Tag: wip~11 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=87e071dbdc4012b28bb486a40496148bc578b4dc;p=demos%2Fkafka%2Fchat WIP:refactor --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 88099c04..9ea4cef3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.SynchronousSink; import java.time.Clock; import java.time.LocalDateTime; @@ -66,9 +67,9 @@ public class ChatRoom Message.MessageKey key = Message.MessageKey.of(user, id); - Mono mono = service + return service .getMessage(key) - .handle((existing, sink) -> + .handle((Message existing, SynchronousSink sink) -> { if (existing.getMessageText().equals(text)) { @@ -78,9 +79,8 @@ public class ChatRoom { sink.error(new MessageMutationException(existing, text)); } - }); - - return mono.switchIfEmpty(Mono.defer(() -> service + }) + .switchIfEmpty(Mono.defer(() -> service .persistMessage(key, LocalDateTime.now(clock), text) .doOnNext(m -> {