X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fdomain%2FChatroom.java;h=df6794aa72064874176c18d822afc981f6a8cd71;hb=6df4d75042bf7dc6737b5de8ef2bc9d3a79e9a89;hp=62c97327c4409dcccf283cf198d3ad2fde70c093;hpb=f0113ff34d7a4147ce54116680b611061244f39a;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java index 62c97327..df6794aa 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.domain; import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.Value; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -10,7 +9,6 @@ import reactor.core.publisher.Sinks; import java.time.LocalDateTime; import java.util.*; -import java.util.stream.Stream; @RequiredArgsConstructor @@ -21,7 +19,7 @@ public class Chatroom private final UUID id; @Getter private final String name; - private final LinkedHashMap messages = new LinkedHashMap<>(); + private final LinkedHashMap messages = new LinkedHashMap<>(); private final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); synchronized public Mono addMessage( @@ -40,9 +38,9 @@ public class Chatroom String user, String text) { - Message message = new Message(id, (long)messages.size(), timestamp, user, text); + Message.MessageKey key = Message.MessageKey.of(user, id); + Message message = new Message(key, (long)messages.size(), timestamp, text); - MessageKey key = new MessageKey(user, id); Message existing = messages.get(key); if (existing != null) { @@ -62,7 +60,7 @@ public class Chatroom { return Mono.fromSupplier(() -> { - MessageKey key = MessageKey.of(username, messageId); + Message.MessageKey key = Message.MessageKey.of(username, messageId); return messages.get(key); }); } @@ -72,19 +70,15 @@ public class Chatroom return sink.asFlux(); } - public Stream getMessages(long firstMessage) + public Flux getMessages(long first, long last) { - return messages + return Flux.fromStream(messages .values() .stream() - .filter(message -> message.getSerialNumber() >= firstMessage); - } - - - @Value(staticConstructor = "of") - static class MessageKey - { - String username; - Long messageId; + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); } }