From 87e071dbdc4012b28bb486a40496148bc578b4dc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 25 Jan 2023 18:47:10 +0100 Subject: [PATCH] WIP:refactor --- .../de/juplo/kafka/chat/backend/domain/ChatRoom.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 88099c04..9ea4cef3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.SynchronousSink; import java.time.Clock; import java.time.LocalDateTime; @@ -66,9 +67,9 @@ public class ChatRoom Message.MessageKey key = Message.MessageKey.of(user, id); - Mono mono = service + return service .getMessage(key) - .handle((existing, sink) -> + .handle((Message existing, SynchronousSink sink) -> { if (existing.getMessageText().equals(text)) { @@ -78,9 +79,8 @@ public class ChatRoom { sink.error(new MessageMutationException(existing, text)); } - }); - - return mono.switchIfEmpty(Mono.defer(() -> service + }) + .switchIfEmpty(Mono.defer(() -> service .persistMessage(key, LocalDateTime.now(clock), text) .doOnNext(m -> { -- 2.20.1