1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
11 import java.time.LocalDateTime;
12 import java.util.LinkedHashMap;
13 import java.util.UUID;
16 @RequiredArgsConstructor
18 public class KafkaChatMessageService implements ChatMessageService
20 private final DataChannel dataChannel;
22 private final UUID chatRoomId;
24 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
28 public Mono<Message> persistMessage(
29 Message.MessageKey key,
30 LocalDateTime timestamp,
34 .sendChatMessage(chatRoomId, key, timestamp, text)
35 .doOnSuccess(message -> persistMessage(message));
38 void persistMessage(Message message)
40 messages.put (message.getKey(), message);
44 synchronized public Mono<Message> getMessage(Message.MessageKey key)
46 return Mono.fromSupplier(() -> messages.get(key));
50 synchronized public Flux<Message> getMessages(long first, long last)
52 return Flux.fromStream(messages
57 long serial = message.getSerialNumber();
58 return serial >= first && serial <= last;