1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.apache.kafka.common.TopicPartition;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
14 import java.time.LocalDateTime;
15 import java.time.ZoneOffset;
16 import java.util.LinkedHashMap;
17 import java.util.UUID;
21 @RequiredArgsConstructor
22 public class KafkaChatRoomService implements ChatRoomService
24 private final Producer<String, MessageTo> producer;
25 private final TopicPartition tp;
26 private final UUID chatRoomId;
27 private final ZoneOffset zoneOffset;
29 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
31 private long offset = 0l;
35 public Mono<Message> persistMessage(
36 Message.MessageKey key,
37 LocalDateTime timestamp,
40 return Mono.create(sink ->
42 ProducerRecord<String, MessageTo> record =
46 timestamp.toEpochSecond(zoneOffset),
47 chatRoomId.toString(),
48 MessageTo.of(key.getUsername(), key.getMessageId(), text));
50 producer.send(record, ((metadata, exception) ->
55 Message message = messages.get(key);
58 if (message.getMessageText().equals(text))
60 // Warn and emit existing message
62 "Keeping existing message with {}@{} for {}",
63 message.getSerialNumber(),
64 message.getTimestamp(), key);
68 // Emit error and abort
69 sink.error(new MessageMutationException(message, text));
76 message = new Message(key, metadata.offset(), timestamp, text);
77 messages.put(message.getKey(), message);
85 sink.error(exception);
92 public Mono<Message> getMessage(Message.MessageKey key)
94 return Mono.fromSupplier(() -> messages.get(key));
98 public Flux<Message> getMessages(long first, long last)
100 return Flux.fromStream(messages
105 long serial = message.getSerialNumber();
106 return serial >= first && serial <= last;