From 2f099cf12d3cd2ef0c017a335d573cd2dcbf59d3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 25 Jan 2023 18:35:10 +0100 Subject: [PATCH] WIP:refactor --- .../kafka/chat/backend/domain/ChatRoom.java | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 9ae967bd..dd7321ed 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -65,20 +65,30 @@ public class ChatRoom throw new InvalidUsernameException(user); Message.MessageKey key = Message.MessageKey.of(user, id); - return service - .getMessage(key) - .flatMap(existing -> text.equals(existing.getMessageText()) - ? Mono.just(existing) - : service - .persistMessage(key, LocalDateTime.now(clock), text) - .doOnNext(m -> - { - Sinks.EmitResult result = sink.tryEmitNext(m); - if (result.isFailure()) - { - log.warn("Emitting of message failed with {} for {}", result.name(), m); - } - })); + Mono mono = service + .getMessage(key) + .handle((existing, sink) -> + { + if (existing.getMessageText().equals(text)) + { + sink.next(existing); + } + else + { + sink.error(new MessageMutationException(existing, text)); + } + }); + + return mono.>switchIfEmpty(service + .persistMessage(key, LocalDateTime.now(clock), text) + .>doOnNext(m -> + { + Sinks.EmitResult result = sink.tryEmitNext(m); + if (result.isFailure()) + { + log.warn("Emitting of message failed with {} for {}", result.name(), m); + } + })); } -- 2.20.1