1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
12 import java.time.LocalDateTime;
13 import java.util.LinkedHashMap;
14 import java.util.UUID;
17 @RequiredArgsConstructor
19 public class KafkaChatRoomService implements ChatRoomService
21 private final KafkaChatHomeService kafkaChatHomeService;
22 private final UUID chatRoomId;
24 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
28 public Mono<Message> persistMessage(
29 Message.MessageKey key,
30 LocalDateTime timestamp,
33 return kafkaChatHomeService
34 .sendMessage(chatRoomId, key, timestamp, text)
35 .doOnSuccess(message -> persistMessage(message));
38 public void persistMessage(Message message)
40 messages.put(message.getKey(), message)
44 synchronized public Mono<Message> getMessage(Message.MessageKey key)
46 // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
47 // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
48 return Mono.fromSupplier(() -> messages.get(key));
52 synchronized public Flux<Message> getMessages(long first, long last)
54 return Flux.fromStream(messages
59 long serial = message.getSerialNumber();
60 return serial >= first && serial <= last;