refactor: `ChatRoomService.persistMessage(..)` returns a `Mono<Message>`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoom.java
index cc5c5a0..da5eba2 100644 (file)
@@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
+import reactor.core.publisher.SynchronousSink;
 
 import java.time.Clock;
 import java.time.LocalDateTime;
@@ -25,6 +26,8 @@ public class ChatRoom
   private final UUID id;
   @Getter
   private final String name;
+  @Getter
+  private final int shard;
   private final Clock clock;
   private final ChatRoomService service;
   private final int bufferSize;
@@ -34,6 +37,7 @@ public class ChatRoom
   public ChatRoom(
       UUID id,
       String name,
+      int shard,
       Clock clock,
       ChatRoomService service,
       int bufferSize)
@@ -41,6 +45,7 @@ public class ChatRoom
     log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
     this.id = id;
     this.name = name;
+    this.shard = shard;
     this.clock = clock;
     this.service = service;
     this.bufferSize = bufferSize;
@@ -63,12 +68,20 @@ public class ChatRoom
     Message.MessageKey key = Message.MessageKey.of(user, id);
     return service
         .getMessage(key)
-        .flatMap(existing -> text.equals(existing.getMessageText())
-            ? Mono.just(existing)
-            : Mono.error(() -> new MessageMutationException(existing, text)))
+        .handle((Message existing, SynchronousSink<Message> sink) ->
+        {
+          if (existing.getMessageText().equals(text))
+          {
+            sink.next(existing);
+          }
+          else
+          {
+            sink.error(new MessageMutationException(existing, text));
+          }
+        })
         .switchIfEmpty(
             Mono
-                .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text))
+                .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text))
                 .doOnNext(m ->
                 {
                   Sinks.EmitResult result = sink.tryEmitNext(m);