1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
9 import java.time.LocalDateTime;
10 import java.util.LinkedHashMap;
11 import java.util.UUID;
14 @RequiredArgsConstructor
16 public class KafkaChatRoomService implements ChatRoomService
18 private final ChatMessageChannel chatMessageChannel;
19 private final UUID chatRoomId;
21 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
25 public Mono<Message> persistMessage(
26 Message.MessageKey key,
27 LocalDateTime timestamp,
30 return chatMessageChannel
31 .sendMessage(chatRoomId, key, timestamp, text)
32 .doOnSuccess(message -> persistMessage(message));
35 public void persistMessage(Message message)
37 messages.put(message.getKey(), message);
41 synchronized public Mono<Message> getMessage(Message.MessageKey key)
43 return Mono.fromSupplier(() -> messages.get(key));
47 synchronized public Flux<Message> getMessages(long first, long last)
49 return Flux.fromStream(messages
54 long serial = message.getSerialNumber();
55 return serial >= first && serial <= last;