X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchatroom%2Fdomain%2FChatroom.java;h=f87a088350c229f80df55cfe7121c7519170abb4;hb=974327a0e3a970af36e3ace4a8fb17d8a800b4a3;hp=fd7d73aa3bbaf7e81631a0d4fd2a23e4782109f2;hpb=a8164a477f0aa94f5c6e21dcf5c2ef8a8627402b;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java index fd7d73aa..f87a0883 100644 --- a/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java +++ b/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java @@ -2,6 +2,10 @@ package de.juplo.kafka.chatroom.domain; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import java.time.LocalDateTime; import java.util.LinkedList; @@ -11,6 +15,7 @@ import java.util.stream.Stream; @RequiredArgsConstructor +@Slf4j public class Chatroom { @Getter @@ -18,8 +23,19 @@ public class Chatroom @Getter private final String name; private final List messages = new LinkedList<>(); + private final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); - synchronized public Message addMessage( + synchronized public Mono addMessage( + UUID id, + LocalDateTime timestamp, + String user, + String text) + { + return persistMessage(id, timestamp, user, text) + .doOnNext(message -> sink.tryEmitNext(message).orThrow()); + } + + private Mono persistMessage( UUID id, LocalDateTime timestamp, String user, @@ -27,7 +43,14 @@ public class Chatroom { Message message = new Message(id, (long)messages.size(), timestamp, user, text); messages.add(message); - return message; + return Mono + .fromSupplier(() -> message) + .log(); + } + + public Flux listen() + { + return sink.asFlux(); } public Stream getMessages(long firstMessage)