WIP:refactor
authorKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 17:35:10 +0000 (18:35 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 17:35:10 +0000 (18:35 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java

index 9ae967b..dd7321e 100644 (file)
@@ -65,20 +65,30 @@ public class ChatRoom
       throw new InvalidUsernameException(user);
 
     Message.MessageKey key = Message.MessageKey.of(user, id);
-    return service
-        .getMessage(key)
-        .flatMap(existing -> text.equals(existing.getMessageText())
-            ? Mono.just(existing)
-            : service
-              .persistMessage(key, LocalDateTime.now(clock), text)
-              .doOnNext(m ->
-              {
-                Sinks.EmitResult result = sink.tryEmitNext(m);
-                if (result.isFailure())
-                {
-                  log.warn("Emitting of message failed with {} for {}", result.name(), m);
-                }
-              }));
+    Mono<Message> mono = service
+      .getMessage(key)
+      .handle((existing, sink) ->
+      {
+        if (existing.getMessageText().equals(text))
+        {
+          sink.next(existing);
+        }
+        else
+        {
+          sink.error(new MessageMutationException(existing, text));
+        }
+      });
+
+      return mono.<Mono<Message>>switchIfEmpty(service
+    .persistMessage(key, LocalDateTime.now(clock), text)
+    .<Mono<Message>>doOnNext(m ->
+    {
+      Sinks.EmitResult result = sink.tryEmitNext(m);
+      if (result.isFailure())
+      {
+        log.warn("Emitting of message failed with {} for {}", result.name(), m);
+      }
+    }));
   }