feat: implemented a listen-method for the chat-service based on a Flux
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chatroom / domain / Chatroom.java
index fd7d73a..cf1aff0 100644 (file)
@@ -2,6 +2,10 @@ package de.juplo.kafka.chatroom.domain;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
 
 import java.time.LocalDateTime;
 import java.util.LinkedList;
@@ -11,6 +15,7 @@ import java.util.stream.Stream;
 
 
 @RequiredArgsConstructor
+@Slf4j
 public class Chatroom
 {
   @Getter
@@ -18,8 +23,19 @@ public class Chatroom
   @Getter
   private final String name;
   private final List<Message> messages = new LinkedList<>();
+  private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
 
-  synchronized public Message addMessage(
+  synchronized public Mono<Message> addMessage(
+      UUID id,
+      LocalDateTime timestamp,
+      String user,
+      String text)
+  {
+    return persist(id, timestamp, user, text)
+        .doOnNext(message -> sink.tryEmitNext(message).orThrow());
+  }
+
+  private Mono<Message> persistMessage(
       UUID id,
       LocalDateTime timestamp,
       String user,
@@ -27,7 +43,14 @@ public class Chatroom
   {
     Message message = new Message(id, (long)messages.size(), timestamp, user, text);
     messages.add(message);
-    return message;
+    return Mono
+        .fromSupplier(() -> message)
+        .log();
+  }
+
+  public Flux<Message> listen()
+  {
+    return sink.asFlux();
   }
 
   public Stream<Message> getMessages(long firstMessage)