- Switched the return-type of `Chatroom.addMessage()` to `Mono<Message>`.
- Added an inner method, that "persists" the message and also returns
`Mono<Message>`.
- `addMessage()` calls `persistMessage()`, peeks into the subscription and
emits the `Message` to an internal `Sink.multy()` of the `Chatroom`.
- `Chatroom.listen()` creates a `Flux` from the internal Sink, so that
multiple subscribers can listen on the sink and all retrieve the current
messages in parallel.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
+ <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
</dependency>
<dependency>
import de.juplo.kafka.chatroom.domain.Chatroom;
import de.juplo.kafka.chatroom.domain.Message;
import lombok.RequiredArgsConstructor;
import de.juplo.kafka.chatroom.domain.Chatroom;
import de.juplo.kafka.chatroom.domain.Message;
import lombok.RequiredArgsConstructor;
+import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
}
@PutMapping("post/{chatroomId}/{username}/{messageId}")
}
@PutMapping("post/{chatroomId}/{username}/{messageId}")
+ public Mono<MessageTo> post(
@PathVariable UUID chatroomId,
@PathVariable String username,
@PathVariable UUID messageId,
@PathVariable UUID chatroomId,
@PathVariable String username,
@PathVariable UUID messageId,
- @RequestBody String message)
+ @RequestBody String text)
chatrooms
.get(chatroomId)
.addMessage(
messageId,
LocalDateTime.now(clock),
username,
chatrooms
.get(chatroomId)
.addMessage(
messageId,
LocalDateTime.now(clock),
username,
+ text)
+ .map(message -> MessageTo.from(message));
+
+ @GetMapping(
+ path = "listen/{chatroomId}",
+ produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux<Message> listen(@PathVariable UUID chatroomId)
+ {
+ return chatrooms.get(chatroomId).listen();
+ }}
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
import java.time.LocalDateTime;
import java.util.LinkedList;
import java.time.LocalDateTime;
import java.util.LinkedList;
public class Chatroom
{
@Getter
public class Chatroom
{
@Getter
@Getter
private final String name;
private final List<Message> messages = new LinkedList<>();
@Getter
private final String name;
private final List<Message> messages = new LinkedList<>();
+ private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
- synchronized public Message addMessage(
+ synchronized public Mono<Message> addMessage(
+ UUID id,
+ LocalDateTime timestamp,
+ String user,
+ String text)
+ {
+ return persist(id, timestamp, user, text)
+ .doOnNext(message -> sink.tryEmitNext(message).orThrow());
+ }
+
+ private Mono<Message> persistMessage(
UUID id,
LocalDateTime timestamp,
String user,
UUID id,
LocalDateTime timestamp,
String user,
{
Message message = new Message(id, (long)messages.size(), timestamp, user, text);
messages.add(message);
{
Message message = new Message(id, (long)messages.size(), timestamp, user, text);
messages.add(message);
+ return Mono
+ .fromSupplier(() -> message)
+ .log();
+ }
+
+ public Flux<Message> listen()
+ {
+ return sink.asFlux();
}
public Stream<Message> getMessages(long firstMessage)
}
public Stream<Message> getMessages(long firstMessage)