1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
13 import java.time.LocalDateTime;
14 import java.util.LinkedHashMap;
18 @RequiredArgsConstructor
19 public class KafkaChatRoomService implements ChatRoomService
21 private final Producer<String, MessageTo> producer;
22 private final TopicPartition tp;
24 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
26 private long offset = 0l;
30 public Message persistMessage(
31 Message.MessageKey key,
32 LocalDateTime timestamp,
36 Mono.error(() -> new MessageMutationException(existing, text)));
37 Message message = new Message(key, (long)messages.size(), timestamp, text);
38 messages.put(message.getKey(), message);
43 public Mono<Message> getMessage(Message.MessageKey key)
45 return Mono.fromSupplier(() -> messages.get(key));
49 public Flux<Message> getMessages(long first, long last)
51 return Flux.fromStream(messages
56 long serial = message.getSerialNumber();
57 return serial >= first && serial <= last;