feat: implemented a listen-method for the chat-service based on a Flux
authorKai Moritz <kai@juplo.de>
Wed, 21 Dec 2022 17:57:23 +0000 (18:57 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Jan 2023 18:32:56 +0000 (19:32 +0100)
- Switched the return-type of `Chatroom.addMessage()` to `Mono<Message>`.
- Added an inner method, that "persists" the message and also returns
  `Mono<Message>`.
- `addMessage()` calls `persistMessage()`, peeks into the subscription and
  emits the `Message` to an internal `Sink.multy()` of the `Chatroom`.
- `Chatroom.listen()` creates a `Flux` from the internal Sink, so that
  multiple subscribers can listen on the sink and all retrieve the current
  messages in parallel.

pom.xml
src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java
src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java

diff --git a/pom.xml b/pom.xml
index 98ef4af..22d1810 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
   <dependencies>
     <dependency>
       <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-web</artifactId>
+      <artifactId>spring-boot-starter-webflux</artifactId>
     </dependency>
 
     <dependency>
index e3146a9..5fabf36 100644 (file)
@@ -3,10 +3,15 @@ package de.juplo.kafka.chatroom.api;
 import de.juplo.kafka.chatroom.domain.Chatroom;
 import de.juplo.kafka.chatroom.domain.Message;
 import lombok.RequiredArgsConstructor;
+import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 import java.time.Clock;
+import java.time.Duration;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -36,19 +41,28 @@ public class ChatroomController
   }
 
   @PutMapping("post/{chatroomId}/{username}/{messageId}")
-  public MessageTo post(
+  public Mono<MessageTo> post(
       @PathVariable UUID chatroomId,
       @PathVariable String username,
       @PathVariable UUID messageId,
-      @RequestBody String message)
+      @RequestBody String text)
   {
-    return MessageTo.from(
+    return
         chatrooms
             .get(chatroomId)
             .addMessage(
                 messageId,
                 LocalDateTime.now(clock),
                 username,
-                message));
+                text)
+            .map(message -> MessageTo.from(message));
+  }
+
+  @GetMapping(
+      path = "listen/{chatroomId}",
+      produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+  public Flux<Message> listen(@PathVariable UUID chatroomId)
+  {
+    return chatrooms.get(chatroomId).listen();
   }
 }
index fd7d73a..f87a088 100644 (file)
@@ -2,6 +2,10 @@ package de.juplo.kafka.chatroom.domain;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
 
 import java.time.LocalDateTime;
 import java.util.LinkedList;
@@ -11,6 +15,7 @@ import java.util.stream.Stream;
 
 
 @RequiredArgsConstructor
+@Slf4j
 public class Chatroom
 {
   @Getter
@@ -18,8 +23,19 @@ public class Chatroom
   @Getter
   private final String name;
   private final List<Message> messages = new LinkedList<>();
+  private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
 
-  synchronized public Message addMessage(
+  synchronized public Mono<Message> addMessage(
+      UUID id,
+      LocalDateTime timestamp,
+      String user,
+      String text)
+  {
+    return persistMessage(id, timestamp, user, text)
+        .doOnNext(message -> sink.tryEmitNext(message).orThrow());
+  }
+
+  private Mono<Message> persistMessage(
       UUID id,
       LocalDateTime timestamp,
       String user,
@@ -27,7 +43,14 @@ public class Chatroom
   {
     Message message = new Message(id, (long)messages.size(), timestamp, user, text);
     messages.add(message);
-    return message;
+    return Mono
+        .fromSupplier(() -> message)
+        .log();
+  }
+
+  public Flux<Message> listen()
+  {
+    return sink.asFlux();
   }
 
   public Stream<Message> getMessages(long firstMessage)