From 02ebef6224bfd00d74c9a6ccf467f0387a926df1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 21 Dec 2022 18:57:23 +0100 Subject: [PATCH] feat: implemented a listen-method for the chat-service based on a Flux - Switched the return-type of `Chatroom.addMessage()` to `Mono`. - Added an inner method, that "persists" the message and also returns `Mono`. - `addMessage()` calls `persistMessage()`, peeks into the subscription and emits the `Message` to an internal `Sink.multy()` of the `Chatroom`. - `Chatroom.listen()` creates a `Flux` from the internal Sink, so that multiple subscribers can listen on the sink and all retrieve the current messages in parallel. --- pom.xml | 2 +- .../chatroom/api/ChatroomController.java | 22 ++++++++++++--- .../juplo/kafka/chatroom/domain/Chatroom.java | 27 +++++++++++++++++-- 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index d5e287b1..2e4e98fd 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ org.springframework.boot - spring-boot-starter-web + spring-boot-starter-webflux diff --git a/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java b/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java index e3146a9d..5fabf363 100644 --- a/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java +++ b/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java @@ -3,10 +3,15 @@ package de.juplo.kafka.chatroom.api; import de.juplo.kafka.chatroom.domain.Chatroom; import de.juplo.kafka.chatroom.domain.Message; import lombok.RequiredArgsConstructor; +import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.time.Clock; +import java.time.Duration; import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -36,19 +41,28 @@ public class ChatroomController } @PutMapping("post/{chatroomId}/{username}/{messageId}") - public MessageTo post( + public Mono post( @PathVariable UUID chatroomId, @PathVariable String username, @PathVariable UUID messageId, - @RequestBody String message) + @RequestBody String text) { - return MessageTo.from( + return chatrooms .get(chatroomId) .addMessage( messageId, LocalDateTime.now(clock), username, - message)); + text) + .map(message -> MessageTo.from(message)); + } + + @GetMapping( + path = "listen/{chatroomId}", + produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux listen(@PathVariable UUID chatroomId) + { + return chatrooms.get(chatroomId).listen(); } } diff --git a/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java index fd7d73aa..f87a0883 100644 --- a/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java +++ b/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java @@ -2,6 +2,10 @@ package de.juplo.kafka.chatroom.domain; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import java.time.LocalDateTime; import java.util.LinkedList; @@ -11,6 +15,7 @@ import java.util.stream.Stream; @RequiredArgsConstructor +@Slf4j public class Chatroom { @Getter @@ -18,8 +23,19 @@ public class Chatroom @Getter private final String name; private final List messages = new LinkedList<>(); + private final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); - synchronized public Message addMessage( + synchronized public Mono addMessage( + UUID id, + LocalDateTime timestamp, + String user, + String text) + { + return persistMessage(id, timestamp, user, text) + .doOnNext(message -> sink.tryEmitNext(message).orThrow()); + } + + private Mono persistMessage( UUID id, LocalDateTime timestamp, String user, @@ -27,7 +43,14 @@ public class Chatroom { Message message = new Message(id, (long)messages.size(), timestamp, user, text); messages.add(message); - return message; + return Mono + .fromSupplier(() -> message) + .log(); + } + + public Flux listen() + { + return sink.asFlux(); } public Stream getMessages(long firstMessage) -- 2.20.1